534 lines
18 KiB
Python
534 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Shared helpers for route handlers.
|
|
|
|
Provides unified logic for:
|
|
- mode=filterValues: distinct column values for filter dropdowns (cross-filtered)
|
|
- mode=ids: all IDs matching current filters (for bulk selection)
|
|
- In-memory equivalents for enriched/non-SQL routes
|
|
"""
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from modules.datamodels.datamodelPagination import (
|
|
PaginationParams,
|
|
normalize_pagination_dict,
|
|
)
|
|
from modules.shared.i18nRegistry import resolveText
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Central FK label resolvers (cross-DB)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _resolveMandateLabels(ids: List[str]) -> Dict[str, str]:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
rootIface = getRootInterface()
|
|
mMap = rootIface.getMandatesByIds(ids)
|
|
return {
|
|
mid: getattr(m, "label", None) or getattr(m, "name", mid) or mid
|
|
for mid, m in mMap.items()
|
|
}
|
|
|
|
|
|
def _resolveInstanceLabels(ids: List[str]) -> Dict[str, str]:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
rootIface = getRootInterface()
|
|
featureIface = getFeatureInterface(rootIface.db)
|
|
result: Dict[str, str] = {}
|
|
for iid in ids:
|
|
fi = featureIface.getFeatureInstance(iid)
|
|
result[iid] = fi.label if fi and fi.label else iid
|
|
return result
|
|
|
|
|
|
def _resolveUserLabels(ids: List[str]) -> Dict[str, str]:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
rootIface = getRootInterface()
|
|
users = rootIface.db.getRecordset(
|
|
__import__("modules.datamodels.datamodelUam", fromlist=["User"]).User,
|
|
recordFilter={"id": list(set(ids))},
|
|
)
|
|
result: Dict[str, str] = {}
|
|
for u in (users or []):
|
|
uid = u.get("id", "")
|
|
result[uid] = u.get("username") or u.get("email") or uid
|
|
return result
|
|
|
|
|
|
_BUILTIN_FK_RESOLVERS: Dict[str, Callable[[List[str]], Dict[str, str]]] = {
|
|
"Mandate": _resolveMandateLabels,
|
|
"FeatureInstance": _resolveInstanceLabels,
|
|
"User": _resolveUserLabels,
|
|
}
|
|
|
|
|
|
def _buildLabelResolversFromModel(modelClass: type) -> Dict[str, Callable[[List[str]], Dict[str, str]]]:
|
|
"""
|
|
Auto-build labelResolvers dict from fk_model annotations on a Pydantic model.
|
|
Maps field names to resolver functions for all fields that have a known fk_model.
|
|
"""
|
|
from modules.connectors.connectorDbPostgre import _get_fk_sort_meta
|
|
fkMeta = _get_fk_sort_meta(modelClass)
|
|
resolvers: Dict[str, Callable[[List[str]], Dict[str, str]]] = {}
|
|
for fieldName, meta in fkMeta.items():
|
|
fkModelName = meta.get("model", "")
|
|
if fkModelName in _BUILTIN_FK_RESOLVERS:
|
|
resolvers[fieldName] = _BUILTIN_FK_RESOLVERS[fkModelName]
|
|
return resolvers
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-filter pagination parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parseCrossFilterPagination(
|
|
column: str,
|
|
paginationJson: Optional[str],
|
|
) -> Optional[PaginationParams]:
|
|
"""
|
|
Parse pagination JSON, remove the requested column from filters (cross-filtering),
|
|
and drop sort — used for filter-values requests.
|
|
"""
|
|
if not paginationJson:
|
|
return None
|
|
try:
|
|
paginationDict = json.loads(paginationJson)
|
|
if not paginationDict:
|
|
return None
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
filters = paginationDict.get("filters", {})
|
|
filters.pop(column, None)
|
|
paginationDict["filters"] = filters
|
|
paginationDict.pop("sort", None)
|
|
return PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def parsePaginationForIds(
|
|
paginationJson: Optional[str],
|
|
) -> Optional[PaginationParams]:
|
|
"""
|
|
Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize.
|
|
"""
|
|
if not paginationJson:
|
|
return None
|
|
try:
|
|
paginationDict = json.loads(paginationJson)
|
|
if not paginationDict:
|
|
return None
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationDict.pop("sort", None)
|
|
return PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL-based helpers (delegate to DB connector)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def handleFilterValuesMode(
|
|
db,
|
|
modelClass: type,
|
|
column: str,
|
|
paginationJson: Optional[str] = None,
|
|
recordFilter: Optional[Dict[str, Any]] = None,
|
|
enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None,
|
|
) -> List[str]:
|
|
"""
|
|
SQL-based distinct column values with cross-filtering.
|
|
|
|
If enrichFn is provided and the column is enriched (computed/joined),
|
|
enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT.
|
|
"""
|
|
crossPagination = parseCrossFilterPagination(column, paginationJson)
|
|
|
|
if enrichFn:
|
|
try:
|
|
result = enrichFn(column, crossPagination, recordFilter)
|
|
if result is not None:
|
|
return JSONResponse(content=result)
|
|
except Exception as e:
|
|
logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}")
|
|
|
|
try:
|
|
values = db.getDistinctColumnValues(
|
|
modelClass, column,
|
|
pagination=crossPagination,
|
|
recordFilter=recordFilter,
|
|
) or []
|
|
return JSONResponse(content=values)
|
|
except Exception as e:
|
|
logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}")
|
|
return JSONResponse(content=[])
|
|
|
|
|
|
def handleIdsMode(
|
|
db,
|
|
modelClass: type,
|
|
paginationJson: Optional[str] = None,
|
|
recordFilter: Optional[Dict[str, Any]] = None,
|
|
idField: str = "id",
|
|
) -> List[str]:
|
|
"""
|
|
Return all IDs matching the current filters (no LIMIT/OFFSET).
|
|
Uses the same WHERE clause as getRecordsetPaginated.
|
|
"""
|
|
pagination = parsePaginationForIds(paginationJson)
|
|
table = modelClass.__name__
|
|
|
|
try:
|
|
if not db._ensureTableExists(modelClass):
|
|
return JSONResponse(content=[])
|
|
|
|
where_clause, _, _, values, _ = db._buildPaginationClauses(
|
|
modelClass, pagination, recordFilter,
|
|
)
|
|
|
|
sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"'
|
|
|
|
with db.connection.cursor() as cursor:
|
|
cursor.execute(sql, values)
|
|
return JSONResponse(content=[row["val"] for row in cursor.fetchall()])
|
|
except Exception as e:
|
|
logger.error(f"handleIdsMode failed for {table}: {e}")
|
|
return JSONResponse(content=[])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# In-memory helpers (for enriched / non-SQL routes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _applyFiltersAndSort(
|
|
items: List[Dict[str, Any]],
|
|
paginationParams: Optional[PaginationParams],
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Apply filters and sorting to a list of dicts in-memory.
|
|
Does NOT paginate (no page/pageSize slicing).
|
|
"""
|
|
if not paginationParams:
|
|
return items
|
|
|
|
result = list(items)
|
|
|
|
if paginationParams.filters:
|
|
filters = paginationParams.filters
|
|
searchTerm = filters.get("search", "").lower() if filters.get("search") else None
|
|
|
|
if searchTerm:
|
|
result = [
|
|
item for item in result
|
|
if any(
|
|
searchTerm in str(v).lower()
|
|
for v in item.values()
|
|
if v is not None
|
|
)
|
|
]
|
|
|
|
for field, filterValue in filters.items():
|
|
if field == "search":
|
|
continue
|
|
|
|
if isinstance(filterValue, dict) and "operator" in filterValue:
|
|
operator = filterValue.get("operator", "equals")
|
|
value = filterValue.get("value")
|
|
else:
|
|
operator = "equals"
|
|
value = filterValue
|
|
|
|
if value is None or value == "":
|
|
continue
|
|
|
|
result = [
|
|
item for item in result
|
|
if _matchesFilter(item, field, operator, value)
|
|
]
|
|
|
|
if paginationParams.sort:
|
|
for sortField in reversed(paginationParams.sort):
|
|
fieldName = sortField.field
|
|
ascending = sortField.direction == "asc"
|
|
|
|
noneItems = [item for item in result if item.get(fieldName) is None]
|
|
nonNoneItems = [item for item in result if item.get(fieldName) is not None]
|
|
|
|
def _getSortKey(item: Dict[str, Any], _fn=fieldName):
|
|
value = item.get(_fn)
|
|
if isinstance(value, bool):
|
|
return (0, int(value), "")
|
|
if isinstance(value, (int, float)):
|
|
return (0, value, "")
|
|
return (1, 0, str(value).lower())
|
|
|
|
nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending)
|
|
result = nonNoneItems + noneItems
|
|
|
|
return result
|
|
|
|
|
|
def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool:
|
|
"""Single-field filter match for in-memory filtering."""
|
|
itemValue = item.get(field)
|
|
if itemValue is None:
|
|
return False
|
|
|
|
itemStr = str(itemValue).lower()
|
|
valueStr = str(value).lower()
|
|
|
|
if operator in ("equals", "eq"):
|
|
return itemStr == valueStr
|
|
if operator == "contains":
|
|
return valueStr in itemStr
|
|
if operator == "startsWith":
|
|
return itemStr.startswith(valueStr)
|
|
if operator == "endsWith":
|
|
return itemStr.endswith(valueStr)
|
|
if operator in ("gt", "gte", "lt", "lte"):
|
|
try:
|
|
itemNum = float(itemValue)
|
|
valueNum = float(value)
|
|
if operator == "gt":
|
|
return itemNum > valueNum
|
|
if operator == "gte":
|
|
return itemNum >= valueNum
|
|
if operator == "lt":
|
|
return itemNum < valueNum
|
|
return itemNum <= valueNum
|
|
except (ValueError, TypeError):
|
|
return False
|
|
if operator == "between":
|
|
return _matchesBetween(itemValue, itemStr, value)
|
|
if operator == "in":
|
|
if isinstance(value, list):
|
|
return itemStr in [str(x).lower() for x in value]
|
|
return False
|
|
if operator == "notIn":
|
|
if isinstance(value, list):
|
|
return itemStr not in [str(x).lower() for x in value]
|
|
return True
|
|
return True
|
|
|
|
|
|
def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool:
|
|
"""Handle 'between' operator for date ranges and numeric ranges."""
|
|
if not isinstance(value, dict):
|
|
return True
|
|
fromVal = value.get("from", "")
|
|
toVal = value.get("to", "")
|
|
if not fromVal and not toVal:
|
|
return True
|
|
try:
|
|
from datetime import datetime, timezone
|
|
fromTs = None
|
|
toTs = None
|
|
if fromVal:
|
|
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
|
|
if toVal:
|
|
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
|
|
hour=23, minute=59, second=59, tzinfo=timezone.utc
|
|
).timestamp()
|
|
itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue
|
|
if itemNum > 10000000000:
|
|
itemNum = itemNum / 1000
|
|
if fromTs is not None and toTs is not None:
|
|
return fromTs <= itemNum <= toTs
|
|
if fromTs is not None:
|
|
return itemNum >= fromTs
|
|
if toTs is not None:
|
|
return itemNum <= toTs
|
|
except (ValueError, TypeError):
|
|
fromStr = str(fromVal).lower() if fromVal else ""
|
|
toStr = str(toVal).lower() if toVal else ""
|
|
if fromStr and toStr:
|
|
return fromStr <= itemStr <= toStr
|
|
if fromStr:
|
|
return itemStr >= fromStr
|
|
if toStr:
|
|
return itemStr <= toStr
|
|
return True
|
|
|
|
|
|
def _extractDistinctValues(
|
|
items: List[Dict[str, Any]],
|
|
columnKey: str,
|
|
requestLang: Optional[str] = None,
|
|
) -> List[str]:
|
|
"""Extract sorted distinct display values for a column from enriched items."""
|
|
values = set()
|
|
for item in items:
|
|
val = item.get(columnKey)
|
|
if val is None or val == "":
|
|
continue
|
|
if isinstance(val, bool):
|
|
values.add("true" if val else "false")
|
|
elif isinstance(val, (int, float)):
|
|
values.add(str(val))
|
|
elif isinstance(val, dict):
|
|
text = resolveText(val, requestLang)
|
|
if text:
|
|
values.add(text)
|
|
else:
|
|
values.add(str(val))
|
|
return sorted(values, key=lambda v: v.lower())
|
|
|
|
|
|
def handleFilterValuesInMemory(
|
|
items: List[Dict[str, Any]],
|
|
column: str,
|
|
paginationJson: Optional[str] = None,
|
|
requestLang: Optional[str] = None,
|
|
) -> JSONResponse:
|
|
"""
|
|
In-memory filter-values: apply cross-filters, then extract distinct values.
|
|
For routes that build enriched in-memory lists.
|
|
Returns JSONResponse to bypass FastAPI response_model validation.
|
|
"""
|
|
crossFilterParams = parseCrossFilterPagination(column, paginationJson)
|
|
crossFiltered = _applyFiltersAndSort(items, crossFilterParams)
|
|
return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang))
|
|
|
|
|
|
def handleIdsInMemory(
|
|
items: List[Dict[str, Any]],
|
|
paginationJson: Optional[str] = None,
|
|
idField: str = "id",
|
|
) -> JSONResponse:
|
|
"""
|
|
In-memory IDs: apply filters, return all IDs.
|
|
For routes that build enriched in-memory lists.
|
|
Returns JSONResponse to bypass FastAPI response_model validation.
|
|
"""
|
|
pagination = parsePaginationForIds(paginationJson)
|
|
filtered = _applyFiltersAndSort(items, pagination)
|
|
ids = []
|
|
for item in filtered:
|
|
val = item.get(idField)
|
|
if val is not None:
|
|
ids.append(str(val))
|
|
return JSONResponse(content=ids)
|
|
|
|
|
|
def getRecordsetPaginatedWithFkSort(
|
|
db,
|
|
modelClass: type,
|
|
pagination,
|
|
recordFilter: Optional[Dict[str, Any]] = None,
|
|
labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None,
|
|
fieldFilter: Optional[List[str]] = None,
|
|
idField: str = "id",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Wrapper around db.getRecordsetPaginated that handles FK-label sorting.
|
|
|
|
If the current sort field is a FK with a registered labelResolver, the
|
|
function fetches all filtered IDs + FK values, resolves labels cross-DB,
|
|
sorts in-memory by label, and returns only the requested page.
|
|
|
|
If no FK sort is active, delegates directly to db.getRecordsetPaginated.
|
|
"""
|
|
import math
|
|
|
|
if not pagination or not pagination.sort:
|
|
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
|
|
|
|
if labelResolvers is None:
|
|
labelResolvers = _buildLabelResolversFromModel(modelClass)
|
|
|
|
if not labelResolvers:
|
|
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
|
|
|
|
fkSortField = None
|
|
fkSortDir = "asc"
|
|
for sf in pagination.sort:
|
|
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
|
|
sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc")
|
|
if sfField and sfField in labelResolvers:
|
|
fkSortField = sfField
|
|
fkSortDir = str(sfDir).lower()
|
|
break
|
|
|
|
if not fkSortField:
|
|
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
|
|
|
|
try:
|
|
distinctIds = db.getDistinctColumnValues(
|
|
modelClass, fkSortField, recordFilter=recordFilter,
|
|
) or []
|
|
|
|
labelMap = {}
|
|
if distinctIds:
|
|
try:
|
|
labelMap = labelResolvers[fkSortField](distinctIds)
|
|
except Exception as e:
|
|
logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}")
|
|
|
|
filterOnlyPagination = copy.deepcopy(pagination)
|
|
filterOnlyPagination.sort = []
|
|
filterOnlyPagination.page = 1
|
|
filterOnlyPagination.pageSize = 999999
|
|
|
|
lightRows = db.getRecordsetPaginated(
|
|
modelClass, filterOnlyPagination, recordFilter,
|
|
fieldFilter=[idField, fkSortField],
|
|
)
|
|
allRows = lightRows.get("items", [])
|
|
totalItems = len(allRows)
|
|
|
|
if totalItems == 0:
|
|
return {"items": [], "totalItems": 0, "totalPages": 0}
|
|
|
|
def _sortKey(row):
|
|
fkVal = row.get(fkSortField, "") or ""
|
|
label = labelMap.get(str(fkVal), str(fkVal)).lower()
|
|
return label
|
|
|
|
reverse = fkSortDir == "desc"
|
|
allRows.sort(key=_sortKey, reverse=reverse)
|
|
|
|
pageSize = pagination.pageSize
|
|
offset = (pagination.page - 1) * pageSize
|
|
pageSlice = allRows[offset:offset + pageSize]
|
|
pageIds = [row[idField] for row in pageSlice if row.get(idField)]
|
|
|
|
if not pageIds:
|
|
return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)}
|
|
|
|
pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter)
|
|
|
|
idOrder = {pid: idx for idx, pid in enumerate(pageIds)}
|
|
pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999))
|
|
|
|
totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0
|
|
return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages}
|
|
|
|
except Exception as e:
|
|
logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}")
|
|
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
|
|
|
|
|
|
def paginateInMemory(
|
|
items: List[Dict[str, Any]],
|
|
paginationParams: Optional[PaginationParams],
|
|
) -> tuple:
|
|
"""
|
|
Apply pagination (page/pageSize slicing) to an already-filtered+sorted list.
|
|
Returns (pageItems, totalItems).
|
|
"""
|
|
totalItems = len(items)
|
|
if not paginationParams:
|
|
return items, totalItems
|
|
offset = (paginationParams.page - 1) * paginationParams.pageSize
|
|
pageItems = items[offset:offset + paginationParams.pageSize]
|
|
return pageItems, totalItems
|