# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Shared helpers for route handlers. Provides unified logic for: - mode=filterValues: distinct column values for filter dropdowns (cross-filtered) - mode=ids: all IDs matching current filters (for bulk selection) - In-memory equivalents for enriched/non-SQL routes """ import copy import json import logging from typing import Any, Dict, List, Optional, Callable, Union from fastapi.responses import JSONResponse from modules.datamodels.datamodelPagination import ( PaginationParams, normalize_pagination_dict, ) from modules.shared.i18nRegistry import resolveText logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Central FK label resolvers (cross-DB) # --------------------------------------------------------------------------- def resolveMandateLabels(ids: List[str]) -> Dict[str, Optional[str]]: """Resolve mandate IDs to labels. Returns None (not the ID!) for unresolvable entries so the caller can distinguish "resolved" from "missing". """ from modules.interfaces.interfaceDbApp import getRootInterface rootIface = getRootInterface() mMap = rootIface.getMandatesByIds(ids) result: Dict[str, Optional[str]] = {} for mid in ids: m = mMap.get(mid) label = (getattr(m, "label", None) or getattr(m, "name", None)) if m else None if not label: logger.warning("resolveMandateLabels: no label for id=%s (found=%s)", mid, m is not None) result[mid] = label or None return result def resolveInstanceLabels(ids: List[str]) -> Dict[str, Optional[str]]: """Resolve feature-instance IDs to labels. Returns None for unresolvable.""" from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceFeatures import getFeatureInterface rootIface = getRootInterface() featureIface = getFeatureInterface(rootIface.db) result: Dict[str, Optional[str]] = {} for iid in ids: fi = featureIface.getFeatureInstance(iid) label = fi.label if fi and fi.label else None if not label: logger.warning("resolveInstanceLabels: no label for id=%s (found=%s)", iid, fi is not None) result[iid] = label return result def resolveUserLabels(ids: List[str]) -> Dict[str, Optional[str]]: """Resolve user IDs to display names. Returns None for unresolvable.""" from modules.interfaces.interfaceDbApp import getRootInterface rootIface = getRootInterface() from modules.datamodels.datamodelUam import UserInDB as _UserInDB uniqueIds = list(set(ids)) users = rootIface.db.getRecordset( _UserInDB, recordFilter={"id": uniqueIds}, ) result: Dict[str, Optional[str]] = {} found: Dict[str, dict] = {} for u in (users or []): uid = u.get("id", "") found[uid] = u for uid in ids: u = found.get(uid) if u: result[uid] = u.get("displayName") or u.get("username") or u.get("email") or None else: result[uid] = None return result def resolveRoleLabels(ids: List[str]) -> Dict[str, Optional[str]]: """Resolve Role.id to roleLabel. Returns None for unresolvable.""" if not ids: return {} from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelRbac import Role as _Role rootIface = getRootInterface() recs = rootIface.db.getRecordset( _Role, recordFilter={"id": list(set(ids))}, ) or [] out: Dict[str, Optional[str]] = {i: None for i in ids} for r in recs: rid = r.get("id") if rid: out[rid] = r.get("roleLabel") or None for rid in ids: if out.get(rid) is None: logger.warning("resolveRoleLabels: no label for id=%s", rid) return out _BUILTIN_FK_RESOLVERS: Dict[str, Callable[[List[str]], Dict[str, str]]] = { "Mandate": resolveMandateLabels, "FeatureInstance": resolveInstanceLabels, "UserInDB": resolveUserLabels, "Role": resolveRoleLabels, } def _buildLabelResolversFromModel(modelClass: type) -> Dict[str, Callable[[List[str]], Dict[str, str]]]: """ Auto-build labelResolvers dict from ``json_schema_extra.fk_target`` on a Pydantic model. Maps field names to resolver functions when the target table has a registered builtin resolver and ``fk_target.labelField`` is set (non-None). """ resolvers: Dict[str, Callable[[List[str]], Dict[str, str]]] = {} for name, fieldInfo in modelClass.model_fields.items(): extra = fieldInfo.json_schema_extra if not extra or not isinstance(extra, dict): continue tgt = extra.get("fk_target") if not isinstance(tgt, dict): continue if tgt.get("labelField") is None: continue fkModel = tgt.get("table") if fkModel and fkModel in _BUILTIN_FK_RESOLVERS: resolvers[name] = _BUILTIN_FK_RESOLVERS[fkModel] return resolvers def enrichRowsWithFkLabels( rows: List[Dict[str, Any]], modelClass: type = None, *, labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, Optional[str]]]]] = None, extraResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, Optional[str]]]]] = None, ) -> List[Dict[str, Any]]: """Add ``{field}Label`` columns to each row for every FK field that has a registered resolver. ``modelClass`` — if provided, resolvers are auto-built from ``fk_target`` annotations on the Pydantic model (via ``_buildLabelResolversFromModel``). ``labelResolvers`` — explicit resolver map that overrides auto-built ones. ``extraResolvers`` — merged on top of auto-built / explicit resolvers. Use for ad-hoc fields that are not FK-annotated on the model (e.g. ``createdByUserId`` on billing transactions). If a label cannot be resolved the ``{field}Label`` value is ``None`` (never the raw ID — that would reintroduce the silent-truncation bug). """ resolvers: Dict[str, Callable] = {} if modelClass is not None and labelResolvers is None: resolvers = _buildLabelResolversFromModel(modelClass) elif labelResolvers is not None: resolvers = dict(labelResolvers) if extraResolvers: resolvers.update(extraResolvers) if not resolvers or not rows: return rows for field, resolver in resolvers.items(): ids = list({str(r.get(field)) for r in rows if r.get(field)}) if not ids: continue try: labelMap = resolver(ids) except Exception as e: logger.error("enrichRowsWithFkLabels: resolver for '%s' raised: %s", field, e) labelMap = {} labelKey = f"{field}Label" for r in rows: fkVal = r.get(field) if fkVal: r[labelKey] = labelMap.get(str(fkVal)) else: r[labelKey] = None return rows # --------------------------------------------------------------------------- # Cross-filter pagination parsing # --------------------------------------------------------------------------- def parseCrossFilterPagination( column: str, paginationJson: Optional[str], ) -> Optional[PaginationParams]: """ Parse pagination JSON, remove the requested column from filters (cross-filtering), and drop sort — used for filter-values requests. """ if not paginationJson: return None try: paginationDict = json.loads(paginationJson) if not paginationDict: return None paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) return PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError, TypeError): return None def parsePaginationForIds( paginationJson: Optional[str], ) -> Optional[PaginationParams]: """ Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize. """ if not paginationJson: return None try: paginationDict = json.loads(paginationJson) if not paginationDict: return None paginationDict = normalize_pagination_dict(paginationDict) paginationDict.pop("sort", None) return PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError, TypeError): return None # --------------------------------------------------------------------------- # SQL-based helpers (delegate to DB connector) # --------------------------------------------------------------------------- def handleFilterValuesMode( db, modelClass: type, column: str, paginationJson: Optional[str] = None, recordFilter: Optional[Dict[str, Any]] = None, enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None, ) -> List[str]: """ SQL-based distinct column values with cross-filtering. If enrichFn is provided and the column is enriched (computed/joined), enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT. """ crossPagination = parseCrossFilterPagination(column, paginationJson) if enrichFn: try: result = enrichFn(column, crossPagination, recordFilter) if result is not None: return JSONResponse(content=result) except Exception as e: logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}") try: values = db.getDistinctColumnValues( modelClass, column, pagination=crossPagination, recordFilter=recordFilter, ) or [] return JSONResponse(content=values) except Exception as e: logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}") return JSONResponse(content=[]) def handleIdsMode( db, modelClass: type, paginationJson: Optional[str] = None, recordFilter: Optional[Dict[str, Any]] = None, idField: str = "id", ) -> List[str]: """ Return all IDs matching the current filters (no LIMIT/OFFSET). Uses the same WHERE clause as getRecordsetPaginated. """ pagination = parsePaginationForIds(paginationJson) table = modelClass.__name__ try: if not db._ensureTableExists(modelClass): return JSONResponse(content=[]) where_clause, _, _, values, _ = db._buildPaginationClauses( modelClass, pagination, recordFilter, ) sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"' with db.connection.cursor() as cursor: cursor.execute(sql, values) return JSONResponse(content=[row["val"] for row in cursor.fetchall()]) except Exception as e: logger.error(f"handleIdsMode failed for {table}: {e}") return JSONResponse(content=[]) # --------------------------------------------------------------------------- # In-memory helpers (for enriched / non-SQL routes) # --------------------------------------------------------------------------- def applyFiltersAndSort( items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams], ) -> List[Dict[str, Any]]: """ Apply filters and sorting to a list of dicts in-memory. Does NOT paginate (no page/pageSize slicing). """ if not paginationParams: return items result = list(items) if paginationParams.filters: filters = paginationParams.filters searchTerm = filters.get("search", "").lower() if filters.get("search") else None if searchTerm: result = [ item for item in result if any( searchTerm in str(v).lower() for v in item.values() if v is not None ) ] for field, filterValue in filters.items(): if field == "search": continue if isinstance(filterValue, dict) and "operator" in filterValue: operator = filterValue.get("operator", "equals") value = filterValue.get("value") else: operator = "equals" value = filterValue if value is None: result = [ item for item in result if item.get(field) is None or item.get(field) == "" ] continue if value == "": continue result = [ item for item in result if _matchesFilter(item, field, operator, value) ] if paginationParams.sort: for sortField in reversed(paginationParams.sort): fieldName = sortField.field ascending = sortField.direction == "asc" noneItems = [item for item in result if item.get(fieldName) is None] nonNoneItems = [item for item in result if item.get(fieldName) is not None] def _getSortKey(item: Dict[str, Any], _fn=fieldName): value = item.get(_fn) if isinstance(value, bool): return (0, int(value), "") if isinstance(value, (int, float)): return (0, value, "") return (1, 0, str(value).lower()) nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending) result = nonNoneItems + noneItems return result def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool: """Single-field filter match for in-memory filtering.""" itemValue = item.get(field) if itemValue is None: return False itemStr = str(itemValue).lower() valueStr = str(value).lower() if operator in ("equals", "eq"): return itemStr == valueStr if operator == "contains": return valueStr in itemStr if operator == "startsWith": return itemStr.startswith(valueStr) if operator == "endsWith": return itemStr.endswith(valueStr) if operator in ("gt", "gte", "lt", "lte"): try: itemNum = float(itemValue) valueNum = float(value) if operator == "gt": return itemNum > valueNum if operator == "gte": return itemNum >= valueNum if operator == "lt": return itemNum < valueNum return itemNum <= valueNum except (ValueError, TypeError): return False if operator == "between": return _matchesBetween(itemValue, itemStr, value) if operator == "in": if isinstance(value, list): return itemStr in [str(x).lower() for x in value] return False if operator == "notIn": if isinstance(value, list): return itemStr not in [str(x).lower() for x in value] return True return True def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool: """Handle 'between' operator for date ranges and numeric ranges.""" if not isinstance(value, dict): return True fromVal = value.get("from", "") toVal = value.get("to", "") if not fromVal and not toVal: return True try: from datetime import datetime, timezone fromTs = None toTs = None if fromVal: fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() if toVal: toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace( hour=23, minute=59, second=59, tzinfo=timezone.utc ).timestamp() itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue if itemNum > 10000000000: itemNum = itemNum / 1000 if fromTs is not None and toTs is not None: return fromTs <= itemNum <= toTs if fromTs is not None: return itemNum >= fromTs if toTs is not None: return itemNum <= toTs except (ValueError, TypeError): # Numeric range (e.g. FormGeneratorTable column filters on INTEGER/FLOAT) try: itemNum = float(itemValue) fromNum = float(fromVal) if fromVal not in (None, "") else None toNum = float(toVal) if toVal not in (None, "") else None if fromNum is not None and toNum is not None: return fromNum <= itemNum <= toNum if fromNum is not None: return itemNum >= fromNum if toNum is not None: return itemNum <= toNum except (ValueError, TypeError): pass fromStr = str(fromVal).lower() if fromVal else "" toStr = str(toVal).lower() if toVal else "" if fromStr and toStr: return fromStr <= itemStr <= toStr if fromStr: return itemStr >= fromStr if toStr: return itemStr <= toStr return True def _extractDistinctValues( items: List[Dict[str, Any]], columnKey: str, requestLang: Optional[str] = None, ) -> list: """Extract sorted distinct display values for a column from enriched items. When the items contain a ``{columnKey}Label`` field (FK enrichment convention), returns ``{value, label}`` objects so the frontend shows human-readable labels in filter dropdowns. Otherwise returns plain strings. Includes ``None`` as the last entry when at least one row has a null/empty value — this enables the "(Leer)" filter option in the frontend. """ _MISSING = object() labelKey = f"{columnKey}Label" hasFkLabels = any(labelKey in item for item in items[:20]) if hasFkLabels: byVal: Dict[str, str] = {} hasEmpty = False for item in items: val = item.get(columnKey, _MISSING) if val is _MISSING: continue if val is None or val == "": hasEmpty = True continue strVal = str(val) if strVal not in byVal: label = item.get(labelKey) byVal[strVal] = str(label) if label else f"NA({strVal[:8]})" result: list = sorted( [{"value": v, "label": l} for v, l in byVal.items()], key=lambda x: x["label"].lower(), ) if hasEmpty: result.append(None) return result values = set() hasEmpty = False for item in items: val = item.get(columnKey, _MISSING) if val is _MISSING: continue if val is None or val == "": hasEmpty = True continue if isinstance(val, bool): values.add("true" if val else "false") elif isinstance(val, (int, float)): values.add(str(val)) elif isinstance(val, dict): text = resolveText(val, requestLang) if text: values.add(text) else: values.add(str(val)) result = sorted(values, key=lambda v: v.lower()) if hasEmpty: result.append(None) return result def handleFilterValuesInMemory( items: List[Dict[str, Any]], column: str, paginationJson: Optional[str] = None, requestLang: Optional[str] = None, ) -> JSONResponse: """ In-memory filter-values: apply cross-filters, then extract distinct values. For routes that build enriched in-memory lists. Returns JSONResponse to bypass FastAPI response_model validation. """ crossFilterParams = parseCrossFilterPagination(column, paginationJson) crossFiltered = applyFiltersAndSort(items, crossFilterParams) return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang)) def handleIdsInMemory( items: List[Dict[str, Any]], paginationJson: Optional[str] = None, idField: str = "id", ) -> JSONResponse: """ In-memory IDs: apply filters, return all IDs. For routes that build enriched in-memory lists. Returns JSONResponse to bypass FastAPI response_model validation. """ pagination = parsePaginationForIds(paginationJson) filtered = applyFiltersAndSort(items, pagination) ids = [] for item in filtered: val = item.get(idField) if val is not None: ids.append(str(val)) return JSONResponse(content=ids) def getRecordsetPaginatedWithFkSort( db, modelClass: type, pagination, recordFilter: Optional[Dict[str, Any]] = None, labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None, fieldFilter: Optional[List[str]] = None, idField: str = "id", ) -> Dict[str, Any]: """ Wrapper around db.getRecordsetPaginated that handles FK-label sorting. If the current sort field is a FK with a registered labelResolver, the function fetches all filtered IDs + FK values, resolves labels cross-DB, sorts in-memory by label, and returns only the requested page. If no FK sort is active, delegates directly to db.getRecordsetPaginated. """ import math if not pagination or not pagination.sort: return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) if labelResolvers is None: labelResolvers = _buildLabelResolversFromModel(modelClass) if not labelResolvers: return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) fkSortField = None fkSortDir = "asc" for sf in pagination.sort: sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None) sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc") if sfField and sfField in labelResolvers: fkSortField = sfField fkSortDir = str(sfDir).lower() break if not fkSortField: return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) try: distinctIds = db.getDistinctColumnValues( modelClass, fkSortField, recordFilter=recordFilter, ) or [] labelMap = {} if distinctIds: try: labelMap = labelResolvers[fkSortField](distinctIds) except Exception as e: logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}") filterOnlyPagination = copy.deepcopy(pagination) filterOnlyPagination.sort = [] filterOnlyPagination.page = 1 filterOnlyPagination.pageSize = 999999 lightRows = db.getRecordsetPaginated( modelClass, filterOnlyPagination, recordFilter, fieldFilter=[idField, fkSortField], ) allRows = lightRows.get("items", []) totalItems = len(allRows) if totalItems == 0: return {"items": [], "totalItems": 0, "totalPages": 0} def _sortKey(row): fkVal = row.get(fkSortField, "") or "" label = labelMap.get(str(fkVal), str(fkVal)).lower() return label reverse = fkSortDir == "desc" allRows.sort(key=_sortKey, reverse=reverse) pageSize = pagination.pageSize offset = (pagination.page - 1) * pageSize pageSlice = allRows[offset:offset + pageSize] pageIds = [row[idField] for row in pageSlice if row.get(idField)] if not pageIds: return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)} pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter) idOrder = {pid: idx for idx, pid in enumerate(pageIds)} pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999)) enrichRowsWithFkLabels(pageItems, modelClass) totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0 return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages} except Exception as e: logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}") return db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) def paginateInMemory( items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams], ) -> tuple: """ Apply pagination (page/pageSize slicing) to an already-filtered+sorted list. Returns (pageItems, totalItems). """ totalItems = len(items) if not paginationParams: return items, totalItems offset = (paginationParams.page - 1) * paginationParams.pageSize pageItems = items[offset:offset + paginationParams.pageSize] return pageItems, totalItems # --------------------------------------------------------------------------- # View resolution and Strategy B grouping engine # --------------------------------------------------------------------------- def resolveView(interface, contextKey: str, viewKey: Optional[str]): """ Load a TableListView for the current user and contextKey. Returns (config_dict, display_name): - (None, None) when viewKey is None / empty - (config, str | None) otherwise — config may be {}; display_name from the row Raises HTTPException(404) when viewKey is explicitly set but the view does not exist (prevents silent fallback to ungrouped behaviour). """ from fastapi import HTTPException if not viewKey: return None, None try: view = interface.getTableListView(contextKey=contextKey, viewKey=viewKey) except Exception as e: logger.warning(f"resolveView: store lookup failed for key={viewKey!r} context={contextKey!r}: {e}") view = None if view is None: raise HTTPException(status_code=404, detail=f"View '{viewKey}' not found for context '{contextKey}'") cfg = view.config or {} dname = getattr(view, "displayName", None) or None return cfg, dname def effective_group_by_levels( pagination_params: Optional["PaginationParams"], view_config: Optional[dict], ) -> List[Dict[str, Any]]: """ Choose grouping levels for this request. If the client sends ``groupByLevels`` (including ``[]``), it wins over the saved view. If the key is omitted (``None``), use the view's levels. """ if pagination_params is not None: req = getattr(pagination_params, "groupByLevels", None) if req is not None: out: List[Dict[str, Any]] = [] for lvl in req: if hasattr(lvl, "model_dump"): out.append(lvl.model_dump()) elif isinstance(lvl, dict): out.append(dict(lvl)) else: out.append(dict(lvl)) # type: ignore[arg-type] return out vc = (view_config or {}).get("groupByLevels") if view_config else None return list(vc or []) def applyViewToParams(params: Optional["PaginationParams"], viewConfig: Optional[dict]) -> Optional["PaginationParams"]: """ Merge a view's saved configuration into PaginationParams. Priority: explicit request fields win over view defaults. - sort: use request sort if non-empty, otherwise view sort - filters: deep-merge (request filters win per-key) - pageSize: use request value (already set by normalize_pagination_dict) Returns the (mutated) params, or a new minimal PaginationParams when params is None (so callers always get a valid object). """ from modules.datamodels.datamodelPagination import PaginationParams, SortField if not viewConfig: return params if params is None: params = PaginationParams(page=1, pageSize=25) # Sort: request wins if non-empty if not params.sort and viewConfig.get("sort"): try: params.sort = [ SortField(**s) if isinstance(s, dict) else s for s in viewConfig["sort"] ] except Exception as e: logger.warning(f"applyViewToParams: could not parse view sort: {e}") # Filters: deep-merge (request filters take priority per-key) viewFilters = viewConfig.get("filters") or {} if viewFilters: merged = dict(viewFilters) if params.filters: merged.update(params.filters) params.filters = merged return params def apply_strategy_b_filters_and_sort( items: List[Dict[str, Any]], pagination_params: Optional[PaginationParams], current_user: Any, ) -> List[Dict[str, Any]]: """ Shared in-memory filter + sort pass for Strategy B (files/prompts/connections lists). """ if not pagination_params: return list(items) from modules.interfaces.interfaceDbManagement import ComponentObjects comp = ComponentObjects() comp.setUserContext(current_user) out = list(items) if pagination_params.filters: out = comp._applyFilters(out, pagination_params.filters) if pagination_params.sort: out = comp._applySorting(out, pagination_params.sort) return out def build_group_summary_groups( items: List[Dict[str, Any]], field: str, null_label: str = "—", ) -> List[Dict[str, Any]]: """ Build {"value", "label", "totalCount"} for mode=groupSummary (single grouping level). """ from collections import defaultdict counts: Dict[str, int] = defaultdict(int) display_by_key: Dict[str, str] = {} null_key = "\x00NULL" label_attr = f"{field}Label" for item in items: raw = item.get(field) if raw is None or raw == "": nk = null_key display = null_label else: nk = str(raw) display = None lbl = item.get(label_attr) if lbl is not None and lbl != "": display = str(lbl) if display is None: display = nk counts[nk] += 1 if nk not in display_by_key: display_by_key[nk] = display ordered_keys = sorted( counts.keys(), key=lambda x: (x == null_key, str(display_by_key.get(x, x)).lower()), ) return [ { "value": None if nk == null_key else nk, "label": display_by_key.get(nk, nk), "totalCount": counts[nk], } for nk in ordered_keys ] def buildGroupLayout( all_items: List[Dict[str, Any]], groupByLevels: List[Dict[str, Any]], page: int, pageSize: int, ) -> tuple: """ Apply multi-level grouping to all_items, slice to the requested page, and return (page_items, GroupLayout | None). Strategy B: grouping operates on the full filtered+sorted candidate list. Items are stably re-sorted by the group path so that members of the same group are always contiguous (preserving the existing per-group sort order from the caller). Parameters ---------- all_items: fully filtered and user-sorted list of row dicts. groupByLevels: list of {"field": str, "nullLabel": str, "direction": "asc"|"desc"} dicts. page, pageSize: 1-based page index and page size. Returns ------- (page_items, GroupLayout | None) """ from functools import cmp_to_key from modules.datamodels.datamodelPagination import GroupBand, GroupLayout if not groupByLevels: offset = (page - 1) * pageSize return all_items[offset:offset + pageSize], None levels = [lvl.get("field", "") for lvl in groupByLevels if lvl.get("field")] if not levels: offset = (page - 1) * pageSize return all_items[offset:offset + pageSize], None nullLabels = {lvl.get("field", ""): lvl.get("nullLabel", "—") for lvl in groupByLevels} def _path_key(item: dict) -> tuple: return tuple( str(item.get(f) or "") if item.get(f) is not None else nullLabels.get(f, "—") for f in levels ) def _item_cmp(a: dict, b: dict) -> int: pa, pb = _path_key(a), _path_key(b) for i in range(len(levels)): if pa[i] != pb[i]: asc = (groupByLevels[i].get("direction") or "asc").lower() != "desc" if pa[i] < pb[i]: return -1 if asc else 1 return 1 if asc else -1 return 0 # Sort by group path (per-level asc/desc); order within same path stays stable in Py3.12+ all_items.sort(key=cmp_to_key(_item_cmp)) # Build global band list from the full sorted list bands_global: List[dict] = [] current_path: Optional[tuple] = None current_start = 0 for i, item in enumerate(all_items): path = _path_key(item) if path != current_path: if current_path is not None: bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": i}) current_path = path current_start = i if current_path is not None: bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": len(all_items)}) # Slice to page page_start = (page - 1) * pageSize page_end = page_start + pageSize page_items = all_items[page_start:page_end] # Find bands that have at least one row on this page bands_on_page: List[GroupBand] = [] for band in bands_global: inter_start = max(band["startIdx"], page_start) inter_end = min(band["endIdx"], page_end) if inter_start >= inter_end: continue path_list = band["path"] bands_on_page.append(GroupBand( path=path_list, label=path_list[-1] if path_list else "—", startRowIndex=inter_start - page_start, rowCount=inter_end - inter_start, )) group_layout = GroupLayout(levels=levels, bands=bands_on_page) if bands_on_page else GroupLayout(levels=levels, bands=[]) return page_items, group_layout