gateway/modules/routes/routeHelpers.py

963 lines
34 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Shared helpers for route handlers.
Provides unified logic for:
- mode=filterValues: distinct column values for filter dropdowns (cross-filtered)
- mode=ids: all IDs matching current filters (for bulk selection)
- In-memory equivalents for enriched/non-SQL routes
"""
import copy
import json
import logging
from typing import Any, Dict, List, Optional, Callable, Union
from fastapi.responses import JSONResponse
from modules.datamodels.datamodelPagination import (
PaginationParams,
normalize_pagination_dict,
)
from modules.shared.i18nRegistry import resolveText
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Central FK label resolvers (cross-DB)
# ---------------------------------------------------------------------------
def resolveMandateLabels(ids: List[str]) -> Dict[str, Optional[str]]:
"""Resolve mandate IDs to labels. Returns None (not the ID!) for
unresolvable entries so the caller can distinguish "resolved" from "missing".
"""
from modules.interfaces.interfaceDbApp import getRootInterface
rootIface = getRootInterface()
mMap = rootIface.getMandatesByIds(ids)
result: Dict[str, Optional[str]] = {}
for mid in ids:
m = mMap.get(mid)
label = (getattr(m, "label", None) or getattr(m, "name", None)) if m else None
if not label:
logger.warning("resolveMandateLabels: no label for id=%s (found=%s)", mid, m is not None)
result[mid] = label or None
return result
def resolveInstanceLabels(ids: List[str]) -> Dict[str, Optional[str]]:
"""Resolve feature-instance IDs to labels. Returns None for unresolvable."""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
rootIface = getRootInterface()
featureIface = getFeatureInterface(rootIface.db)
result: Dict[str, Optional[str]] = {}
for iid in ids:
fi = featureIface.getFeatureInstance(iid)
label = fi.label if fi and fi.label else None
if not label:
logger.warning("resolveInstanceLabels: no label for id=%s (found=%s)", iid, fi is not None)
result[iid] = label
return result
def resolveUserLabels(ids: List[str]) -> Dict[str, Optional[str]]:
"""Resolve user IDs to display names. Returns None for unresolvable."""
from modules.interfaces.interfaceDbApp import getRootInterface
rootIface = getRootInterface()
from modules.datamodels.datamodelUam import UserInDB as _UserInDB
uniqueIds = list(set(ids))
users = rootIface.db.getRecordset(
_UserInDB,
recordFilter={"id": uniqueIds},
)
result: Dict[str, Optional[str]] = {}
found: Dict[str, dict] = {}
for u in (users or []):
uid = u.get("id", "")
found[uid] = u
for uid in ids:
u = found.get(uid)
if u:
result[uid] = u.get("displayName") or u.get("username") or u.get("email") or None
else:
result[uid] = None
return result
def resolveRoleLabels(ids: List[str]) -> Dict[str, Optional[str]]:
"""Resolve Role.id to roleLabel. Returns None for unresolvable."""
if not ids:
return {}
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role as _Role
rootIface = getRootInterface()
recs = rootIface.db.getRecordset(
_Role,
recordFilter={"id": list(set(ids))},
) or []
out: Dict[str, Optional[str]] = {i: None for i in ids}
for r in recs:
rid = r.get("id")
if rid:
out[rid] = r.get("roleLabel") or None
for rid in ids:
if out.get(rid) is None:
logger.warning("resolveRoleLabels: no label for id=%s", rid)
return out
_BUILTIN_FK_RESOLVERS: Dict[str, Callable[[List[str]], Dict[str, str]]] = {
"Mandate": resolveMandateLabels,
"FeatureInstance": resolveInstanceLabels,
"UserInDB": resolveUserLabels,
"Role": resolveRoleLabels,
}
def _buildLabelResolversFromModel(modelClass: type) -> Dict[str, Callable[[List[str]], Dict[str, str]]]:
"""
Auto-build labelResolvers dict from ``json_schema_extra.fk_target`` on a Pydantic model.
Maps field names to resolver functions when the target table has a registered builtin
resolver and ``fk_target.labelField`` is set (non-None).
"""
resolvers: Dict[str, Callable[[List[str]], Dict[str, str]]] = {}
for name, fieldInfo in modelClass.model_fields.items():
extra = fieldInfo.json_schema_extra
if not extra or not isinstance(extra, dict):
continue
tgt = extra.get("fk_target")
if not isinstance(tgt, dict):
continue
if tgt.get("labelField") is None:
continue
fkModel = tgt.get("table")
if fkModel and fkModel in _BUILTIN_FK_RESOLVERS:
resolvers[name] = _BUILTIN_FK_RESOLVERS[fkModel]
return resolvers
def enrichRowsWithFkLabels(
rows: List[Dict[str, Any]],
modelClass: type = None,
*,
labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, Optional[str]]]]] = None,
extraResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, Optional[str]]]]] = None,
) -> List[Dict[str, Any]]:
"""Add ``{field}Label`` columns to each row for every FK field that has a
registered resolver.
``modelClass`` — if provided, resolvers are auto-built from ``fk_target``
annotations on the Pydantic model (via ``_buildLabelResolversFromModel``).
``labelResolvers`` — explicit resolver map that overrides auto-built ones.
``extraResolvers`` — merged on top of auto-built / explicit resolvers. Use
for ad-hoc fields that are not FK-annotated on the model (e.g.
``createdByUserId`` on billing transactions).
If a label cannot be resolved the ``{field}Label`` value is ``None``
(never the raw ID — that would reintroduce the silent-truncation bug).
"""
resolvers: Dict[str, Callable] = {}
if modelClass is not None and labelResolvers is None:
resolvers = _buildLabelResolversFromModel(modelClass)
elif labelResolvers is not None:
resolvers = dict(labelResolvers)
if extraResolvers:
resolvers.update(extraResolvers)
if not resolvers or not rows:
return rows
for field, resolver in resolvers.items():
ids = list({str(r.get(field)) for r in rows if r.get(field)})
if not ids:
continue
try:
labelMap = resolver(ids)
except Exception as e:
logger.error("enrichRowsWithFkLabels: resolver for '%s' raised: %s", field, e)
labelMap = {}
labelKey = f"{field}Label"
for r in rows:
fkVal = r.get(field)
if fkVal:
r[labelKey] = labelMap.get(str(fkVal))
else:
r[labelKey] = None
return rows
# ---------------------------------------------------------------------------
# Cross-filter pagination parsing
# ---------------------------------------------------------------------------
def parseCrossFilterPagination(
column: str,
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON, remove the requested column from filters (cross-filtering),
and drop sort — used for filter-values requests.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
filters = paginationDict.get("filters", {})
filters.pop(column, None)
paginationDict["filters"] = filters
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
def parsePaginationForIds(
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
# ---------------------------------------------------------------------------
# SQL-based helpers (delegate to DB connector)
# ---------------------------------------------------------------------------
def handleFilterValuesMode(
db,
modelClass: type,
column: str,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None,
) -> List[str]:
"""
SQL-based distinct column values with cross-filtering.
If enrichFn is provided and the column is enriched (computed/joined),
enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT.
"""
crossPagination = parseCrossFilterPagination(column, paginationJson)
if enrichFn:
try:
result = enrichFn(column, crossPagination, recordFilter)
if result is not None:
return JSONResponse(content=result)
except Exception as e:
logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}")
try:
values = db.getDistinctColumnValues(
modelClass, column,
pagination=crossPagination,
recordFilter=recordFilter,
) or []
return JSONResponse(content=values)
except Exception as e:
logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}")
return JSONResponse(content=[])
def handleIdsMode(
db,
modelClass: type,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
idField: str = "id",
) -> List[str]:
"""
Return all IDs matching the current filters (no LIMIT/OFFSET).
Uses the same WHERE clause as getRecordsetPaginated.
"""
pagination = parsePaginationForIds(paginationJson)
table = modelClass.__name__
try:
if not db._ensureTableExists(modelClass):
return JSONResponse(content=[])
where_clause, _, _, values, _ = db._buildPaginationClauses(
modelClass, pagination, recordFilter,
)
sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"'
with db.connection.cursor() as cursor:
cursor.execute(sql, values)
return JSONResponse(content=[row["val"] for row in cursor.fetchall()])
except Exception as e:
logger.error(f"handleIdsMode failed for {table}: {e}")
return JSONResponse(content=[])
# ---------------------------------------------------------------------------
# In-memory helpers (for enriched / non-SQL routes)
# ---------------------------------------------------------------------------
def applyFiltersAndSort(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> List[Dict[str, Any]]:
"""
Apply filters and sorting to a list of dicts in-memory.
Does NOT paginate (no page/pageSize slicing).
"""
if not paginationParams:
return items
result = list(items)
if paginationParams.filters:
filters = paginationParams.filters
searchTerm = filters.get("search", "").lower() if filters.get("search") else None
if searchTerm:
result = [
item for item in result
if any(
searchTerm in str(v).lower()
for v in item.values()
if v is not None
)
]
for field, filterValue in filters.items():
if field == "search":
continue
if isinstance(filterValue, dict) and "operator" in filterValue:
operator = filterValue.get("operator", "equals")
value = filterValue.get("value")
else:
operator = "equals"
value = filterValue
if value is None:
result = [
item for item in result
if item.get(field) is None or item.get(field) == ""
]
continue
if value == "":
continue
result = [
item for item in result
if _matchesFilter(item, field, operator, value)
]
if paginationParams.sort:
for sortField in reversed(paginationParams.sort):
fieldName = sortField.field
ascending = sortField.direction == "asc"
noneItems = [item for item in result if item.get(fieldName) is None]
nonNoneItems = [item for item in result if item.get(fieldName) is not None]
def _getSortKey(item: Dict[str, Any], _fn=fieldName):
value = item.get(_fn)
if isinstance(value, bool):
return (0, int(value), "")
if isinstance(value, (int, float)):
return (0, value, "")
return (1, 0, str(value).lower())
nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending)
result = nonNoneItems + noneItems
return result
def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool:
"""Single-field filter match for in-memory filtering."""
itemValue = item.get(field)
if itemValue is None:
return False
itemStr = str(itemValue).lower()
valueStr = str(value).lower()
if operator in ("equals", "eq"):
return itemStr == valueStr
if operator == "contains":
return valueStr in itemStr
if operator == "startsWith":
return itemStr.startswith(valueStr)
if operator == "endsWith":
return itemStr.endswith(valueStr)
if operator in ("gt", "gte", "lt", "lte"):
try:
itemNum = float(itemValue)
valueNum = float(value)
if operator == "gt":
return itemNum > valueNum
if operator == "gte":
return itemNum >= valueNum
if operator == "lt":
return itemNum < valueNum
return itemNum <= valueNum
except (ValueError, TypeError):
return False
if operator == "between":
return _matchesBetween(itemValue, itemStr, value)
if operator == "in":
if isinstance(value, list):
return itemStr in [str(x).lower() for x in value]
return False
if operator == "notIn":
if isinstance(value, list):
return itemStr not in [str(x).lower() for x in value]
return True
return True
def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool:
"""Handle 'between' operator for date ranges and numeric ranges."""
if not isinstance(value, dict):
return True
fromVal = value.get("from", "")
toVal = value.get("to", "")
if not fromVal and not toVal:
return True
try:
from datetime import datetime, timezone
fromTs = None
toTs = None
if fromVal:
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
if toVal:
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
hour=23, minute=59, second=59, tzinfo=timezone.utc
).timestamp()
itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue
if itemNum > 10000000000:
itemNum = itemNum / 1000
if fromTs is not None and toTs is not None:
return fromTs <= itemNum <= toTs
if fromTs is not None:
return itemNum >= fromTs
if toTs is not None:
return itemNum <= toTs
except (ValueError, TypeError):
# Numeric range (e.g. FormGeneratorTable column filters on INTEGER/FLOAT)
try:
itemNum = float(itemValue)
fromNum = float(fromVal) if fromVal not in (None, "") else None
toNum = float(toVal) if toVal not in (None, "") else None
if fromNum is not None and toNum is not None:
return fromNum <= itemNum <= toNum
if fromNum is not None:
return itemNum >= fromNum
if toNum is not None:
return itemNum <= toNum
except (ValueError, TypeError):
pass
fromStr = str(fromVal).lower() if fromVal else ""
toStr = str(toVal).lower() if toVal else ""
if fromStr and toStr:
return fromStr <= itemStr <= toStr
if fromStr:
return itemStr >= fromStr
if toStr:
return itemStr <= toStr
return True
def _extractDistinctValues(
items: List[Dict[str, Any]],
columnKey: str,
requestLang: Optional[str] = None,
) -> list:
"""Extract sorted distinct display values for a column from enriched items.
When the items contain a ``{columnKey}Label`` field (FK enrichment convention),
returns ``{value, label}`` objects so the frontend shows human-readable
labels in filter dropdowns. Otherwise returns plain strings.
Includes ``None`` as the last entry when at least one row has a null/empty
value — this enables the "(Leer)" filter option in the frontend.
"""
_MISSING = object()
labelKey = f"{columnKey}Label"
hasFkLabels = any(labelKey in item for item in items[:20])
if hasFkLabels:
byVal: Dict[str, str] = {}
hasEmpty = False
for item in items:
val = item.get(columnKey, _MISSING)
if val is _MISSING:
continue
if val is None or val == "":
hasEmpty = True
continue
strVal = str(val)
if strVal not in byVal:
label = item.get(labelKey)
byVal[strVal] = str(label) if label else f"NA({strVal[:8]})"
result: list = sorted(
[{"value": v, "label": l} for v, l in byVal.items()],
key=lambda x: x["label"].lower(),
)
if hasEmpty:
result.append(None)
return result
values = set()
hasEmpty = False
for item in items:
val = item.get(columnKey, _MISSING)
if val is _MISSING:
continue
if val is None or val == "":
hasEmpty = True
continue
if isinstance(val, bool):
values.add("true" if val else "false")
elif isinstance(val, (int, float)):
values.add(str(val))
elif isinstance(val, dict):
text = resolveText(val, requestLang)
if text:
values.add(text)
else:
values.add(str(val))
result = sorted(values, key=lambda v: v.lower())
if hasEmpty:
result.append(None)
return result
def handleFilterValuesInMemory(
items: List[Dict[str, Any]],
column: str,
paginationJson: Optional[str] = None,
requestLang: Optional[str] = None,
) -> JSONResponse:
"""
In-memory filter-values: apply cross-filters, then extract distinct values.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
crossFilterParams = parseCrossFilterPagination(column, paginationJson)
crossFiltered = applyFiltersAndSort(items, crossFilterParams)
return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang))
def handleIdsInMemory(
items: List[Dict[str, Any]],
paginationJson: Optional[str] = None,
idField: str = "id",
) -> JSONResponse:
"""
In-memory IDs: apply filters, return all IDs.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
pagination = parsePaginationForIds(paginationJson)
filtered = applyFiltersAndSort(items, pagination)
ids = []
for item in filtered:
val = item.get(idField)
if val is not None:
ids.append(str(val))
return JSONResponse(content=ids)
def getRecordsetPaginatedWithFkSort(
db,
modelClass: type,
pagination,
recordFilter: Optional[Dict[str, Any]] = None,
labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None,
fieldFilter: Optional[List[str]] = None,
idField: str = "id",
) -> Dict[str, Any]:
"""
Wrapper around db.getRecordsetPaginated that handles FK-label sorting.
If the current sort field is a FK with a registered labelResolver, the
function fetches all filtered IDs + FK values, resolves labels cross-DB,
sorts in-memory by label, and returns only the requested page.
If no FK sort is active, delegates directly to db.getRecordsetPaginated.
"""
import math
if not pagination or not pagination.sort:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
if labelResolvers is None:
labelResolvers = _buildLabelResolversFromModel(modelClass)
if not labelResolvers:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
fkSortField = None
fkSortDir = "asc"
for sf in pagination.sort:
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc")
if sfField and sfField in labelResolvers:
fkSortField = sfField
fkSortDir = str(sfDir).lower()
break
if not fkSortField:
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
try:
distinctIds = db.getDistinctColumnValues(
modelClass, fkSortField, recordFilter=recordFilter,
) or []
labelMap = {}
if distinctIds:
try:
labelMap = labelResolvers[fkSortField](distinctIds)
except Exception as e:
logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}")
filterOnlyPagination = copy.deepcopy(pagination)
filterOnlyPagination.sort = []
filterOnlyPagination.page = 1
filterOnlyPagination.pageSize = 999999
lightRows = db.getRecordsetPaginated(
modelClass, filterOnlyPagination, recordFilter,
fieldFilter=[idField, fkSortField],
)
allRows = lightRows.get("items", [])
totalItems = len(allRows)
if totalItems == 0:
return {"items": [], "totalItems": 0, "totalPages": 0}
def _sortKey(row):
fkVal = row.get(fkSortField, "") or ""
label = labelMap.get(str(fkVal), str(fkVal)).lower()
return label
reverse = fkSortDir == "desc"
allRows.sort(key=_sortKey, reverse=reverse)
pageSize = pagination.pageSize
offset = (pagination.page - 1) * pageSize
pageSlice = allRows[offset:offset + pageSize]
pageIds = [row[idField] for row in pageSlice if row.get(idField)]
if not pageIds:
return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)}
pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter)
idOrder = {pid: idx for idx, pid in enumerate(pageIds)}
pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999))
enrichRowsWithFkLabels(pageItems, modelClass)
totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0
return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages}
except Exception as e:
logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}")
return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
def paginateInMemory(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> tuple:
"""
Apply pagination (page/pageSize slicing) to an already-filtered+sorted list.
Returns (pageItems, totalItems).
"""
totalItems = len(items)
if not paginationParams:
return items, totalItems
offset = (paginationParams.page - 1) * paginationParams.pageSize
pageItems = items[offset:offset + paginationParams.pageSize]
return pageItems, totalItems
# ---------------------------------------------------------------------------
# View resolution and Strategy B grouping engine
# ---------------------------------------------------------------------------
def resolveView(interface, contextKey: str, viewKey: Optional[str]):
"""
Load a TableListView for the current user and contextKey.
Returns (config_dict, display_name):
- (None, None) when viewKey is None / empty
- (config, str | None) otherwise — config may be {}; display_name from the row
Raises HTTPException(404) when viewKey is explicitly set but the view
does not exist (prevents silent fallback to ungrouped behaviour).
"""
from fastapi import HTTPException
if not viewKey:
return None, None
try:
view = interface.getTableListView(contextKey=contextKey, viewKey=viewKey)
except Exception as e:
logger.warning(f"resolveView: store lookup failed for key={viewKey!r} context={contextKey!r}: {e}")
view = None
if view is None:
raise HTTPException(status_code=404, detail=f"View '{viewKey}' not found for context '{contextKey}'")
cfg = view.config or {}
dname = getattr(view, "displayName", None) or None
return cfg, dname
def effective_group_by_levels(
pagination_params: Optional["PaginationParams"],
view_config: Optional[dict],
) -> List[Dict[str, Any]]:
"""
Choose grouping levels for this request.
If the client sends ``groupByLevels`` (including ``[]``), it wins over the
saved view. If the key is omitted (``None``), use the view's levels.
"""
if pagination_params is not None:
req = getattr(pagination_params, "groupByLevels", None)
if req is not None:
out: List[Dict[str, Any]] = []
for lvl in req:
if hasattr(lvl, "model_dump"):
out.append(lvl.model_dump())
elif isinstance(lvl, dict):
out.append(dict(lvl))
else:
out.append(dict(lvl)) # type: ignore[arg-type]
return out
vc = (view_config or {}).get("groupByLevels") if view_config else None
return list(vc or [])
def applyViewToParams(params: Optional["PaginationParams"], viewConfig: Optional[dict]) -> Optional["PaginationParams"]:
"""
Merge a view's saved configuration into PaginationParams.
Priority: explicit request fields win over view defaults.
- sort: use request sort if non-empty, otherwise view sort
- filters: deep-merge (request filters win per-key)
- pageSize: use request value (already set by normalize_pagination_dict)
Returns the (mutated) params, or a new minimal PaginationParams when
params is None (so callers always get a valid object).
"""
from modules.datamodels.datamodelPagination import PaginationParams, SortField
if not viewConfig:
return params
if params is None:
params = PaginationParams(page=1, pageSize=25)
# Sort: request wins if non-empty
if not params.sort and viewConfig.get("sort"):
try:
params.sort = [
SortField(**s) if isinstance(s, dict) else s
for s in viewConfig["sort"]
]
except Exception as e:
logger.warning(f"applyViewToParams: could not parse view sort: {e}")
# Filters: deep-merge (request filters take priority per-key)
viewFilters = viewConfig.get("filters") or {}
if viewFilters:
merged = dict(viewFilters)
if params.filters:
merged.update(params.filters)
params.filters = merged
return params
def apply_strategy_b_filters_and_sort(
items: List[Dict[str, Any]],
pagination_params: Optional[PaginationParams],
current_user: Any,
) -> List[Dict[str, Any]]:
"""
Shared in-memory filter + sort pass for Strategy B (files/prompts/connections lists).
"""
if not pagination_params:
return list(items)
from modules.interfaces.interfaceDbManagement import ComponentObjects
comp = ComponentObjects()
comp.setUserContext(current_user)
out = list(items)
if pagination_params.filters:
out = comp._applyFilters(out, pagination_params.filters)
if pagination_params.sort:
out = comp._applySorting(out, pagination_params.sort)
return out
def build_group_summary_groups(
items: List[Dict[str, Any]],
field: str,
null_label: str = "",
) -> List[Dict[str, Any]]:
"""
Build {"value", "label", "totalCount"} for mode=groupSummary (single grouping level).
"""
from collections import defaultdict
counts: Dict[str, int] = defaultdict(int)
display_by_key: Dict[str, str] = {}
null_key = "\x00NULL"
label_attr = f"{field}Label"
for item in items:
raw = item.get(field)
if raw is None or raw == "":
nk = null_key
display = null_label
else:
nk = str(raw)
display = None
lbl = item.get(label_attr)
if lbl is not None and lbl != "":
display = str(lbl)
if display is None:
display = nk
counts[nk] += 1
if nk not in display_by_key:
display_by_key[nk] = display
ordered_keys = sorted(
counts.keys(),
key=lambda x: (x == null_key, str(display_by_key.get(x, x)).lower()),
)
return [
{
"value": None if nk == null_key else nk,
"label": display_by_key.get(nk, nk),
"totalCount": counts[nk],
}
for nk in ordered_keys
]
def buildGroupLayout(
all_items: List[Dict[str, Any]],
groupByLevels: List[Dict[str, Any]],
page: int,
pageSize: int,
) -> tuple:
"""
Apply multi-level grouping to all_items, slice to the requested page,
and return (page_items, GroupLayout | None).
Strategy B: grouping operates on the full filtered+sorted candidate list.
Items are stably re-sorted by the group path so that members of the same
group are always contiguous (preserving the existing per-group sort order
from the caller).
Parameters
----------
all_items: fully filtered and user-sorted list of row dicts.
groupByLevels: list of {"field": str, "nullLabel": str, "direction": "asc"|"desc"} dicts.
page, pageSize: 1-based page index and page size.
Returns
-------
(page_items, GroupLayout | None)
"""
from functools import cmp_to_key
from modules.datamodels.datamodelPagination import GroupBand, GroupLayout
if not groupByLevels:
offset = (page - 1) * pageSize
return all_items[offset:offset + pageSize], None
levels = [lvl.get("field", "") for lvl in groupByLevels if lvl.get("field")]
if not levels:
offset = (page - 1) * pageSize
return all_items[offset:offset + pageSize], None
nullLabels = {lvl.get("field", ""): lvl.get("nullLabel", "") for lvl in groupByLevels}
def _path_key(item: dict) -> tuple:
return tuple(
str(item.get(f) or "") if item.get(f) is not None else nullLabels.get(f, "")
for f in levels
)
def _item_cmp(a: dict, b: dict) -> int:
pa, pb = _path_key(a), _path_key(b)
for i in range(len(levels)):
if pa[i] != pb[i]:
asc = (groupByLevels[i].get("direction") or "asc").lower() != "desc"
if pa[i] < pb[i]:
return -1 if asc else 1
return 1 if asc else -1
return 0
# Sort by group path (per-level asc/desc); order within same path stays stable in Py3.12+
all_items.sort(key=cmp_to_key(_item_cmp))
# Build global band list from the full sorted list
bands_global: List[dict] = []
current_path: Optional[tuple] = None
current_start = 0
for i, item in enumerate(all_items):
path = _path_key(item)
if path != current_path:
if current_path is not None:
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": i})
current_path = path
current_start = i
if current_path is not None:
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": len(all_items)})
# Slice to page
page_start = (page - 1) * pageSize
page_end = page_start + pageSize
page_items = all_items[page_start:page_end]
# Find bands that have at least one row on this page
bands_on_page: List[GroupBand] = []
for band in bands_global:
inter_start = max(band["startIdx"], page_start)
inter_end = min(band["endIdx"], page_end)
if inter_start >= inter_end:
continue
path_list = band["path"]
bands_on_page.append(GroupBand(
path=path_list,
label=path_list[-1] if path_list else "",
startRowIndex=inter_start - page_start,
rowCount=inter_end - inter_start,
))
group_layout = GroupLayout(levels=levels, bands=bands_on_page) if bands_on_page else GroupLayout(levels=levels, bands=[])
return page_items, group_layout