gateway/modules/routes/routeHelpers.py
2026-04-14 00:15:56 +02:00

534 lines
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Shared helpers for route handlers.
Provides unified logic for:
- mode=filterValues: distinct column values for filter dropdowns (cross-filtered)
- mode=ids: all IDs matching current filters (for bulk selection)
- In-memory equivalents for enriched/non-SQL routes
"""
import copy
import json
import logging
from typing import Any, Dict, List, Optional, Callable
from fastapi.responses import JSONResponse
from modules.datamodels.datamodelPagination import (
PaginationParams,
normalize_pagination_dict,
)
from modules.shared.i18nRegistry import resolveText
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Central FK label resolvers (cross-DB)
# ---------------------------------------------------------------------------
def _resolveMandateLabels(ids: List[str]) -> Dict[str, str]:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIface = getRootInterface()
mMap = rootIface.getMandatesByIds(ids)
return {
mid: getattr(m, "label", None) or getattr(m, "name", mid) or mid
for mid, m in mMap.items()
}
def _resolveInstanceLabels(ids: List[str]) -> Dict[str, str]:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
rootIface = getRootInterface()
featureIface = getFeatureInterface(rootIface.db)
result: Dict[str, str] = {}
for iid in ids:
fi = featureIface.getFeatureInstance(iid)
result[iid] = fi.label if fi and fi.label else iid
return result
def _resolveUserLabels(ids: List[str]) -> Dict[str, str]:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIface = getRootInterface()
users = rootIface.db.getRecordset(
__import__("modules.datamodels.datamodelUam", fromlist=["User"]).User,
recordFilter={"id": list(set(ids))},
)
result: Dict[str, str] = {}
for u in (users or []):
uid = u.get("id", "")
result[uid] = u.get("username") or u.get("email") or uid
return result
_BUILTIN_FK_RESOLVERS: Dict[str, Callable[[List[str]], Dict[str, str]]] = {
"Mandate": _resolveMandateLabels,
"FeatureInstance": _resolveInstanceLabels,
"User": _resolveUserLabels,
}
def _buildLabelResolversFromModel(modelClass: type) -> Dict[str, Callable[[List[str]], Dict[str, str]]]:
"""
Auto-build labelResolvers dict from fk_model annotations on a Pydantic model.
Maps field names to resolver functions for all fields that have a known fk_model.
"""
from modules.connectors.connectorDbPostgre import _get_fk_sort_meta
fkMeta = _get_fk_sort_meta(modelClass)
resolvers: Dict[str, Callable[[List[str]], Dict[str, str]]] = {}
for fieldName, meta in fkMeta.items():
fkModelName = meta.get("model", "")
if fkModelName in _BUILTIN_FK_RESOLVERS:
resolvers[fieldName] = _BUILTIN_FK_RESOLVERS[fkModelName]
return resolvers
# ---------------------------------------------------------------------------
# Cross-filter pagination parsing
# ---------------------------------------------------------------------------
def parseCrossFilterPagination(
column: str,
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON, remove the requested column from filters (cross-filtering),
and drop sort — used for filter-values requests.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
filters = paginationDict.get("filters", {})
filters.pop(column, None)
paginationDict["filters"] = filters
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
def parsePaginationForIds(
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
# ---------------------------------------------------------------------------
# SQL-based helpers (delegate to DB connector)
# ---------------------------------------------------------------------------
def handleFilterValuesMode(
db,
modelClass: type,
column: str,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None,
) -> List[str]:
"""
SQL-based distinct column values with cross-filtering.
If enrichFn is provided and the column is enriched (computed/joined),
enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT.
"""
crossPagination = parseCrossFilterPagination(column, paginationJson)
if enrichFn:
try:
result = enrichFn(column, crossPagination, recordFilter)
if result is not None:
return JSONResponse(content=result)
except Exception as e:
logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}")
try:
values = db.getDistinctColumnValues(
modelClass, column,
pagination=crossPagination,
recordFilter=recordFilter,
) or []
return JSONResponse(content=values)
except Exception as e:
logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}")
return JSONResponse(content=[])
def handleIdsMode(
db,
modelClass: type,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
idField: str = "id",
) -> List[str]:
"""
Return all IDs matching the current filters (no LIMIT/OFFSET).
Uses the same WHERE clause as getRecordsetPaginated.
"""
pagination = parsePaginationForIds(paginationJson)
table = modelClass.__name__
try:
if not db._ensureTableExists(modelClass):
return JSONResponse(content=[])
where_clause, _, _, values, _ = db._buildPaginationClauses(
modelClass, pagination, recordFilter,
)
sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"'
with db.connection.cursor() as cursor:
cursor.execute(sql, values)
return JSONResponse(content=[row["val"] for row in cursor.fetchall()])
except Exception as e:
logger.error(f"handleIdsMode failed for {table}: {e}")
return JSONResponse(content=[])
# ---------------------------------------------------------------------------
# In-memory helpers (for enriched / non-SQL routes)
# ---------------------------------------------------------------------------
def _applyFiltersAndSort(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> List[Dict[str, Any]]:
"""
Apply filters and sorting to a list of dicts in-memory.
Does NOT paginate (no page/pageSize slicing).
"""
if not paginationParams:
return items
result = list(items)
if paginationParams.filters:
filters = paginationParams.filters
searchTerm = filters.get("search", "").lower() if filters.get("search") else None
if searchTerm:
result = [
item for item in result
if any(
searchTerm in str(v).lower()
for v in item.values()
if v is not None
)
]
for field, filterValue in filters.items():
if field == "search":
continue
if isinstance(filterValue, dict) and "operator" in filterValue:
operator = filterValue.get("operator", "equals")
value = filterValue.get("value")
else:
operator = "equals"
value = filterValue
if value is None or value == "":
continue
result = [
item for item in result
if _matchesFilter(item, field, operator, value)
]
if paginationParams.sort:
for sortField in reversed(paginationParams.sort):
fieldName = sortField.field
ascending = sortField.direction == "asc"
noneItems = [item for item in result if item.get(fieldName) is None]
nonNoneItems = [item for item in result if item.get(fieldName) is not None]
def _getSortKey(item: Dict[str, Any], _fn=fieldName):
value = item.get(_fn)
if isinstance(value, bool):
return (0, int(value), "")
if isinstance(value, (int, float)):
return (0, value, "")
return (1, 0, str(value).lower())
nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending)
result = nonNoneItems + noneItems
return result
def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool:
"""Single-field filter match for in-memory filtering."""
itemValue = item.get(field)
if itemValue is None:
return False
itemStr = str(itemValue).lower()
valueStr = str(value).lower()
if operator in ("equals", "eq"):
return itemStr == valueStr
if operator == "contains":
return valueStr in itemStr
if operator == "startsWith":
return itemStr.startswith(valueStr)
if operator == "endsWith":
return itemStr.endswith(valueStr)
if operator in ("gt", "gte", "lt", "lte"):
try:
itemNum = float(itemValue)
valueNum = float(value)
if operator == "gt":
return itemNum > valueNum
if operator == "gte":
return itemNum >= valueNum
if operator == "lt":
return itemNum < valueNum
return itemNum <= valueNum
except (ValueError, TypeError):
return False
if operator == "between":
return _matchesBetween(itemValue, itemStr, value)
if operator == "in":
if isinstance(value, list):
return itemStr in [str(x).lower() for x in value]
return False
if operator == "notIn":
if isinstance(value, list):
return itemStr not in [str(x).lower() for x in value]
return True
return True
def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool:
"""Handle 'between' operator for date ranges and numeric ranges."""
if not isinstance(value, dict):
return True
fromVal = value.get("from", "")
toVal = value.get("to", "")
if not fromVal and not toVal:
return True
try:
from datetime import datetime, timezone
fromTs = None
toTs = None
if fromVal:
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
if toVal:
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
hour=23, minute=59, second=59, tzinfo=timezone.utc
).timestamp()
itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue
if itemNum > 10000000000:
itemNum = itemNum / 1000
if fromTs is not None and toTs is not None:
return fromTs <= itemNum <= toTs
if fromTs is not None:
return itemNum >= fromTs
if toTs is not None:
return itemNum <= toTs
except (ValueError, TypeError):
fromStr = str(fromVal).lower() if fromVal else ""
toStr = str(toVal).lower() if toVal else ""
if fromStr and toStr:
return fromStr <= itemStr <= toStr
if fromStr:
return itemStr >= fromStr
if toStr:
return itemStr <= toStr
return True
def _extractDistinctValues(
items: List[Dict[str, Any]],
columnKey: str,
requestLang: Optional[str] = None,
) -> List[str]:
"""Extract sorted distinct display values for a column from enriched items."""
values = set()
for item in items:
val = item.get(columnKey)
if val is None or val == "":
continue
if isinstance(val, bool):
values.add("true" if val else "false")
elif isinstance(val, (int, float)):
values.add(str(val))
elif isinstance(val, dict):
text = resolveText(val, requestLang)
if text:
values.add(text)
else:
values.add(str(val))
return sorted(values, key=lambda v: v.lower())
def handleFilterValuesInMemory(
items: List[Dict[str, Any]],
column: str,
paginationJson: Optional[str] = None,
requestLang: Optional[str] = None,
) -> JSONResponse:
"""
In-memory filter-values: apply cross-filters, then extract distinct values.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
crossFilterParams = parseCrossFilterPagination(column, paginationJson)
crossFiltered = _applyFiltersAndSort(items, crossFilterParams)
return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang))
def handleIdsInMemory(
items: List[Dict[str, Any]],
paginationJson: Optional[str] = None,
idField: str = "id",
) -> JSONResponse:
"""
In-memory IDs: apply filters, return all IDs.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
pagination = parsePaginationForIds(paginationJson)
filtered = _applyFiltersAndSort(items, pagination)
ids = []
for item in filtered:
val = item.get(idField)
if val is not None:
ids.append(str(val))
return JSONResponse(content=ids)
def getRecordsetPaginatedWithFkSort(
db,
modelClass: type,
pagination,
recordFilter: Optional[Dict[str, Any]] = None,
labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None,
fieldFilter: Optional[List[str]] = None,
idField: str = "id",
) -> Dict[str, Any]:
"""
Wrapper around db.getRecordsetPaginated that handles FK-label sorting.
If the current sort field is a FK with a registered labelResolver, the
function fetches all filtered IDs + FK values, resolves labels cross-DB,
sorts in-memory by label, and returns only the requested page.
If no FK sort is active, delegates directly to db.getRecordsetPaginated.
"""
import math
if not pagination or not pagination.sort:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
if labelResolvers is None:
labelResolvers = _buildLabelResolversFromModel(modelClass)
if not labelResolvers:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
fkSortField = None
fkSortDir = "asc"
for sf in pagination.sort:
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc")
if sfField and sfField in labelResolvers:
fkSortField = sfField
fkSortDir = str(sfDir).lower()
break
if not fkSortField:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
try:
distinctIds = db.getDistinctColumnValues(
modelClass, fkSortField, recordFilter=recordFilter,
) or []
labelMap = {}
if distinctIds:
try:
labelMap = labelResolvers[fkSortField](distinctIds)
except Exception as e:
logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}")
filterOnlyPagination = copy.deepcopy(pagination)
filterOnlyPagination.sort = []
filterOnlyPagination.page = 1
filterOnlyPagination.pageSize = 999999
lightRows = db.getRecordsetPaginated(
modelClass, filterOnlyPagination, recordFilter,
fieldFilter=[idField, fkSortField],
)
allRows = lightRows.get("items", [])
totalItems = len(allRows)
if totalItems == 0:
return {"items": [], "totalItems": 0, "totalPages": 0}
def _sortKey(row):
fkVal = row.get(fkSortField, "") or ""
label = labelMap.get(str(fkVal), str(fkVal)).lower()
return label
reverse = fkSortDir == "desc"
allRows.sort(key=_sortKey, reverse=reverse)
pageSize = pagination.pageSize
offset = (pagination.page - 1) * pageSize
pageSlice = allRows[offset:offset + pageSize]
pageIds = [row[idField] for row in pageSlice if row.get(idField)]
if not pageIds:
return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)}
pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter)
idOrder = {pid: idx for idx, pid in enumerate(pageIds)}
pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999))
totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0
return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages}
except Exception as e:
logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}")
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
def paginateInMemory(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> tuple:
"""
Apply pagination (page/pageSize slicing) to an already-filtered+sorted list.
Returns (pageItems, totalItems).
"""
totalItems = len(items)
if not paginationParams:
return items, totalItems
offset = (paginationParams.page - 1) * paginationParams.pageSize
pageItems = items[offset:offset + paginationParams.pageSize]
return pageItems, totalItems