330 lines
11 KiB
Python
330 lines
11 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Table/list presentation helpers: view resolution, grouping, Strategy B.
|
|
|
|
These helpers orchestrate how paginated table data is grouped, filtered
|
|
and sorted according to saved TableListView configurations.
|
|
"""
|
|
|
|
import logging
|
|
from collections import defaultdict
|
|
from functools import cmp_to_key
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from modules.datamodels.datamodelPagination import PaginationParams
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# View resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def resolveView(interface, contextKey: str, viewKey: Optional[str]):
|
|
"""
|
|
Load a TableListView for the current user and contextKey.
|
|
|
|
Returns (config_dict, display_name):
|
|
- (None, None) when viewKey is None / empty
|
|
- (config, str | None) otherwise — config may be {}; display_name from the row
|
|
|
|
Raises HTTPException(404) when viewKey is explicitly set but the view
|
|
does not exist (prevents silent fallback to ungrouped behaviour).
|
|
"""
|
|
from fastapi import HTTPException
|
|
if not viewKey:
|
|
return None, None
|
|
try:
|
|
view = interface.getTableListView(contextKey=contextKey, viewKey=viewKey)
|
|
except Exception as e:
|
|
logger.warning(f"resolveView: store lookup failed for key={viewKey!r} context={contextKey!r}: {e}")
|
|
view = None
|
|
if view is None:
|
|
raise HTTPException(status_code=404, detail=f"View '{viewKey}' not found for context '{contextKey}'")
|
|
cfg = view.config or {}
|
|
dname = getattr(view, "displayName", None) or None
|
|
return cfg, dname
|
|
|
|
|
|
def effective_group_by_levels(
|
|
pagination_params: Optional["PaginationParams"],
|
|
view_config: Optional[dict],
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Choose grouping levels for this request.
|
|
|
|
If the client sends ``groupByLevels`` (including ``[]``), it wins over the
|
|
saved view. If the key is omitted (``None``), use the view's levels.
|
|
"""
|
|
if pagination_params is not None:
|
|
req = getattr(pagination_params, "groupByLevels", None)
|
|
if req is not None:
|
|
out: List[Dict[str, Any]] = []
|
|
for lvl in req:
|
|
if hasattr(lvl, "model_dump"):
|
|
out.append(lvl.model_dump())
|
|
elif isinstance(lvl, dict):
|
|
out.append(dict(lvl))
|
|
else:
|
|
out.append(dict(lvl)) # type: ignore[arg-type]
|
|
return out
|
|
vc = (view_config or {}).get("groupByLevels") if view_config else None
|
|
return list(vc or [])
|
|
|
|
|
|
def applyViewToParams(params: Optional["PaginationParams"], viewConfig: Optional[dict]) -> Optional["PaginationParams"]:
|
|
"""
|
|
Merge a view's saved configuration into PaginationParams.
|
|
|
|
Priority: explicit request fields win over view defaults.
|
|
- sort: use request sort if non-empty, otherwise view sort
|
|
- filters: deep-merge (request filters win per-key)
|
|
- pageSize: use request value (already set by normalize_pagination_dict)
|
|
|
|
Returns the (mutated) params, or a new minimal PaginationParams when
|
|
params is None (so callers always get a valid object).
|
|
"""
|
|
from modules.datamodels.datamodelPagination import SortField
|
|
if not viewConfig:
|
|
return params
|
|
|
|
if params is None:
|
|
params = PaginationParams(page=1, pageSize=25)
|
|
|
|
if not params.sort and viewConfig.get("sort"):
|
|
try:
|
|
params.sort = [
|
|
SortField(**s) if isinstance(s, dict) else s
|
|
for s in viewConfig["sort"]
|
|
]
|
|
except Exception as e:
|
|
logger.warning(f"applyViewToParams: could not parse view sort: {e}")
|
|
|
|
viewFilters = viewConfig.get("filters") or {}
|
|
if viewFilters:
|
|
merged = dict(viewFilters)
|
|
if params.filters:
|
|
merged.update(params.filters)
|
|
params.filters = merged
|
|
|
|
return params
|
|
|
|
|
|
def apply_strategy_b_filters_and_sort(
|
|
items: List[Dict[str, Any]],
|
|
pagination_params: Optional[PaginationParams],
|
|
current_user: Any,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Shared in-memory filter + sort pass for Strategy B (files/prompts/connections lists).
|
|
"""
|
|
if not pagination_params:
|
|
return list(items)
|
|
from modules.interfaces.interfaceDbManagement import ComponentObjects
|
|
|
|
comp = ComponentObjects()
|
|
comp.setUserContext(current_user)
|
|
out = list(items)
|
|
if pagination_params.filters:
|
|
out = comp._applyFilters(out, pagination_params.filters)
|
|
if pagination_params.sort:
|
|
out = comp._applySorting(out, pagination_params.sort)
|
|
return out
|
|
|
|
|
|
def build_group_summary_groups(
|
|
items: List[Dict[str, Any]],
|
|
field: str,
|
|
null_label: str = "\u2014",
|
|
groupByLevels: List[Dict[str, Any]] | None = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build {"value", "label", "totalCount"} summaries for mode=groupSummary.
|
|
|
|
When *groupByLevels* contains more than one level the function produces one
|
|
entry per unique combination of all level values (flat permutations).
|
|
``value`` becomes a ``///``-joined composite key and ``label`` the ``/``-joined
|
|
human-readable label so the frontend can split them back.
|
|
"""
|
|
|
|
fields: list[dict] = []
|
|
if groupByLevels and len(groupByLevels) > 1:
|
|
for lvl in groupByLevels:
|
|
f = lvl.get("field", "")
|
|
nl = str(lvl.get("nullLabel") or null_label)
|
|
if f:
|
|
fields.append({"field": f, "nullLabel": nl})
|
|
if not fields:
|
|
fields = [{"field": field, "nullLabel": null_label}]
|
|
|
|
nullKey = "\x00NULL"
|
|
|
|
if len(fields) == 1:
|
|
f = fields[0]["field"]
|
|
nl = fields[0]["nullLabel"]
|
|
counts: Dict[str, int] = defaultdict(int)
|
|
displayByKey: Dict[str, str] = {}
|
|
labelAttr = f"{f}Label"
|
|
for item in items:
|
|
raw = item.get(f)
|
|
if raw is None or raw == "":
|
|
nk = nullKey
|
|
display = nl
|
|
else:
|
|
nk = str(raw)
|
|
display = None
|
|
lbl = item.get(labelAttr)
|
|
if lbl is not None and lbl != "":
|
|
display = str(lbl)
|
|
if display is None:
|
|
display = nk
|
|
counts[nk] += 1
|
|
if nk not in displayByKey:
|
|
displayByKey[nk] = display
|
|
orderedKeys = sorted(
|
|
counts.keys(),
|
|
key=lambda x: (x == nullKey, str(displayByKey.get(x, x)).lower()),
|
|
)
|
|
return [
|
|
{
|
|
"value": None if nk == nullKey else nk,
|
|
"label": displayByKey.get(nk, nk),
|
|
"totalCount": counts[nk],
|
|
}
|
|
for nk in orderedKeys
|
|
]
|
|
|
|
counts = defaultdict(int)
|
|
displayByComposite: Dict[str, list] = {}
|
|
filtersByComposite: Dict[str, dict] = {}
|
|
for item in items:
|
|
parts: list[str] = []
|
|
labels: list[str] = []
|
|
filterMap: dict = {}
|
|
for fd in fields:
|
|
f = fd["field"]
|
|
nl = fd["nullLabel"]
|
|
labelAttr = f"{f}Label"
|
|
raw = item.get(f)
|
|
if raw is None or raw == "":
|
|
parts.append(nullKey)
|
|
labels.append(nl)
|
|
filterMap[f] = None
|
|
else:
|
|
parts.append(str(raw))
|
|
lbl = item.get(labelAttr)
|
|
labels.append(str(lbl) if lbl not in (None, "") else str(raw))
|
|
filterMap[f] = str(raw)
|
|
compositeKey = "///".join(parts)
|
|
counts[compositeKey] += 1
|
|
if compositeKey not in displayByComposite:
|
|
displayByComposite[compositeKey] = labels
|
|
filtersByComposite[compositeKey] = filterMap
|
|
|
|
orderedKeys = sorted(
|
|
counts.keys(),
|
|
key=lambda x: tuple(
|
|
(seg == nullKey, seg.lower()) for seg in x.split("///")
|
|
),
|
|
)
|
|
return [
|
|
{
|
|
"value": ck.replace(nullKey, "__null__") if nullKey in ck else ck,
|
|
"label": " / ".join(displayByComposite[ck]),
|
|
"totalCount": counts[ck],
|
|
"filters": filtersByComposite[ck],
|
|
}
|
|
for ck in orderedKeys
|
|
]
|
|
|
|
|
|
def buildGroupLayout(
|
|
all_items: List[Dict[str, Any]],
|
|
groupByLevels: List[Dict[str, Any]],
|
|
page: int,
|
|
pageSize: int,
|
|
) -> tuple:
|
|
"""
|
|
Apply multi-level grouping to all_items, slice to the requested page,
|
|
and return (page_items, GroupLayout | None).
|
|
|
|
Strategy B: grouping operates on the full filtered+sorted candidate list.
|
|
Items are stably re-sorted by the group path so that members of the same
|
|
group are always contiguous (preserving the existing per-group sort order
|
|
from the caller).
|
|
|
|
Parameters
|
|
----------
|
|
all_items: fully filtered and user-sorted list of row dicts.
|
|
groupByLevels: list of {"field": str, "nullLabel": str, "direction": "asc"|"desc"} dicts.
|
|
page, pageSize: 1-based page index and page size.
|
|
|
|
Returns
|
|
-------
|
|
(page_items, GroupLayout | None)
|
|
"""
|
|
from modules.datamodels.datamodelPagination import GroupBand, GroupLayout
|
|
|
|
if not groupByLevels:
|
|
offset = (page - 1) * pageSize
|
|
return all_items[offset:offset + pageSize], None
|
|
|
|
levels = [lvl.get("field", "") for lvl in groupByLevels if lvl.get("field")]
|
|
if not levels:
|
|
offset = (page - 1) * pageSize
|
|
return all_items[offset:offset + pageSize], None
|
|
|
|
nullLabels = {lvl.get("field", ""): lvl.get("nullLabel", "\u2014") for lvl in groupByLevels}
|
|
|
|
def _path_key(item: dict) -> tuple:
|
|
return tuple(
|
|
str(item.get(f) or "") if item.get(f) is not None else nullLabels.get(f, "\u2014")
|
|
for f in levels
|
|
)
|
|
|
|
def _item_cmp(a: dict, b: dict) -> int:
|
|
pa, pb = _path_key(a), _path_key(b)
|
|
for i in range(len(levels)):
|
|
if pa[i] != pb[i]:
|
|
asc = (groupByLevels[i].get("direction") or "asc").lower() != "desc"
|
|
if pa[i] < pb[i]:
|
|
return -1 if asc else 1
|
|
return 1 if asc else -1
|
|
return 0
|
|
|
|
all_items.sort(key=cmp_to_key(_item_cmp))
|
|
|
|
bands_global: List[dict] = []
|
|
current_path: Optional[tuple] = None
|
|
current_start = 0
|
|
for i, item in enumerate(all_items):
|
|
path = _path_key(item)
|
|
if path != current_path:
|
|
if current_path is not None:
|
|
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": i})
|
|
current_path = path
|
|
current_start = i
|
|
if current_path is not None:
|
|
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": len(all_items)})
|
|
|
|
page_start = (page - 1) * pageSize
|
|
page_end = page_start + pageSize
|
|
page_items = all_items[page_start:page_end]
|
|
|
|
bands_on_page: List[GroupBand] = []
|
|
for band in bands_global:
|
|
inter_start = max(band["startIdx"], page_start)
|
|
inter_end = min(band["endIdx"], page_end)
|
|
if inter_start >= inter_end:
|
|
continue
|
|
path_list = band["path"]
|
|
bands_on_page.append(GroupBand(
|
|
path=path_list,
|
|
label=path_list[-1] if path_list else "\u2014",
|
|
startRowIndex=inter_start - page_start,
|
|
rowCount=inter_end - inter_start,
|
|
))
|
|
|
|
group_layout = GroupLayout(levels=levels, bands=bands_on_page) if bands_on_page else GroupLayout(levels=levels, bands=[])
|
|
return page_items, group_layout
|