platform-core/modules/interfaces/interfaceTableHelpers.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

330 lines
11 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Table/list presentation helpers: view resolution, grouping, Strategy B.
These helpers orchestrate how paginated table data is grouped, filtered
and sorted according to saved TableListView configurations.
"""
import logging
from collections import defaultdict
from functools import cmp_to_key
from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelPagination import PaginationParams
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# View resolution
# ---------------------------------------------------------------------------
def resolveView(interface, contextKey: str, viewKey: Optional[str]):
"""
Load a TableListView for the current user and contextKey.
Returns (config_dict, display_name):
- (None, None) when viewKey is None / empty
- (config, str | None) otherwise — config may be {}; display_name from the row
Raises HTTPException(404) when viewKey is explicitly set but the view
does not exist (prevents silent fallback to ungrouped behaviour).
"""
from fastapi import HTTPException
if not viewKey:
return None, None
try:
view = interface.getTableListView(contextKey=contextKey, viewKey=viewKey)
except Exception as e:
logger.warning(f"resolveView: store lookup failed for key={viewKey!r} context={contextKey!r}: {e}")
view = None
if view is None:
raise HTTPException(status_code=404, detail=f"View '{viewKey}' not found for context '{contextKey}'")
cfg = view.config or {}
dname = getattr(view, "displayName", None) or None
return cfg, dname
def effective_group_by_levels(
pagination_params: Optional["PaginationParams"],
view_config: Optional[dict],
) -> List[Dict[str, Any]]:
"""
Choose grouping levels for this request.
If the client sends ``groupByLevels`` (including ``[]``), it wins over the
saved view. If the key is omitted (``None``), use the view's levels.
"""
if pagination_params is not None:
req = getattr(pagination_params, "groupByLevels", None)
if req is not None:
out: List[Dict[str, Any]] = []
for lvl in req:
if hasattr(lvl, "model_dump"):
out.append(lvl.model_dump())
elif isinstance(lvl, dict):
out.append(dict(lvl))
else:
out.append(dict(lvl)) # type: ignore[arg-type]
return out
vc = (view_config or {}).get("groupByLevels") if view_config else None
return list(vc or [])
def applyViewToParams(params: Optional["PaginationParams"], viewConfig: Optional[dict]) -> Optional["PaginationParams"]:
"""
Merge a view's saved configuration into PaginationParams.
Priority: explicit request fields win over view defaults.
- sort: use request sort if non-empty, otherwise view sort
- filters: deep-merge (request filters win per-key)
- pageSize: use request value (already set by normalize_pagination_dict)
Returns the (mutated) params, or a new minimal PaginationParams when
params is None (so callers always get a valid object).
"""
from modules.datamodels.datamodelPagination import SortField
if not viewConfig:
return params
if params is None:
params = PaginationParams(page=1, pageSize=25)
if not params.sort and viewConfig.get("sort"):
try:
params.sort = [
SortField(**s) if isinstance(s, dict) else s
for s in viewConfig["sort"]
]
except Exception as e:
logger.warning(f"applyViewToParams: could not parse view sort: {e}")
viewFilters = viewConfig.get("filters") or {}
if viewFilters:
merged = dict(viewFilters)
if params.filters:
merged.update(params.filters)
params.filters = merged
return params
def apply_strategy_b_filters_and_sort(
items: List[Dict[str, Any]],
pagination_params: Optional[PaginationParams],
current_user: Any,
) -> List[Dict[str, Any]]:
"""
Shared in-memory filter + sort pass for Strategy B (files/prompts/connections lists).
"""
if not pagination_params:
return list(items)
from modules.interfaces.interfaceDbManagement import ComponentObjects
comp = ComponentObjects()
comp.setUserContext(current_user)
out = list(items)
if pagination_params.filters:
out = comp._applyFilters(out, pagination_params.filters)
if pagination_params.sort:
out = comp._applySorting(out, pagination_params.sort)
return out
def build_group_summary_groups(
items: List[Dict[str, Any]],
field: str,
null_label: str = "\u2014",
groupByLevels: List[Dict[str, Any]] | None = None,
) -> List[Dict[str, Any]]:
"""
Build {"value", "label", "totalCount"} summaries for mode=groupSummary.
When *groupByLevels* contains more than one level the function produces one
entry per unique combination of all level values (flat permutations).
``value`` becomes a ``///``-joined composite key and ``label`` the ``/``-joined
human-readable label so the frontend can split them back.
"""
fields: list[dict] = []
if groupByLevels and len(groupByLevels) > 1:
for lvl in groupByLevels:
f = lvl.get("field", "")
nl = str(lvl.get("nullLabel") or null_label)
if f:
fields.append({"field": f, "nullLabel": nl})
if not fields:
fields = [{"field": field, "nullLabel": null_label}]
nullKey = "\x00NULL"
if len(fields) == 1:
f = fields[0]["field"]
nl = fields[0]["nullLabel"]
counts: Dict[str, int] = defaultdict(int)
displayByKey: Dict[str, str] = {}
labelAttr = f"{f}Label"
for item in items:
raw = item.get(f)
if raw is None or raw == "":
nk = nullKey
display = nl
else:
nk = str(raw)
display = None
lbl = item.get(labelAttr)
if lbl is not None and lbl != "":
display = str(lbl)
if display is None:
display = nk
counts[nk] += 1
if nk not in displayByKey:
displayByKey[nk] = display
orderedKeys = sorted(
counts.keys(),
key=lambda x: (x == nullKey, str(displayByKey.get(x, x)).lower()),
)
return [
{
"value": None if nk == nullKey else nk,
"label": displayByKey.get(nk, nk),
"totalCount": counts[nk],
}
for nk in orderedKeys
]
counts = defaultdict(int)
displayByComposite: Dict[str, list] = {}
filtersByComposite: Dict[str, dict] = {}
for item in items:
parts: list[str] = []
labels: list[str] = []
filterMap: dict = {}
for fd in fields:
f = fd["field"]
nl = fd["nullLabel"]
labelAttr = f"{f}Label"
raw = item.get(f)
if raw is None or raw == "":
parts.append(nullKey)
labels.append(nl)
filterMap[f] = None
else:
parts.append(str(raw))
lbl = item.get(labelAttr)
labels.append(str(lbl) if lbl not in (None, "") else str(raw))
filterMap[f] = str(raw)
compositeKey = "///".join(parts)
counts[compositeKey] += 1
if compositeKey not in displayByComposite:
displayByComposite[compositeKey] = labels
filtersByComposite[compositeKey] = filterMap
orderedKeys = sorted(
counts.keys(),
key=lambda x: tuple(
(seg == nullKey, seg.lower()) for seg in x.split("///")
),
)
return [
{
"value": ck.replace(nullKey, "__null__") if nullKey in ck else ck,
"label": " / ".join(displayByComposite[ck]),
"totalCount": counts[ck],
"filters": filtersByComposite[ck],
}
for ck in orderedKeys
]
def buildGroupLayout(
all_items: List[Dict[str, Any]],
groupByLevels: List[Dict[str, Any]],
page: int,
pageSize: int,
) -> tuple:
"""
Apply multi-level grouping to all_items, slice to the requested page,
and return (page_items, GroupLayout | None).
Strategy B: grouping operates on the full filtered+sorted candidate list.
Items are stably re-sorted by the group path so that members of the same
group are always contiguous (preserving the existing per-group sort order
from the caller).
Parameters
----------
all_items: fully filtered and user-sorted list of row dicts.
groupByLevels: list of {"field": str, "nullLabel": str, "direction": "asc"|"desc"} dicts.
page, pageSize: 1-based page index and page size.
Returns
-------
(page_items, GroupLayout | None)
"""
from modules.datamodels.datamodelPagination import GroupBand, GroupLayout
if not groupByLevels:
offset = (page - 1) * pageSize
return all_items[offset:offset + pageSize], None
levels = [lvl.get("field", "") for lvl in groupByLevels if lvl.get("field")]
if not levels:
offset = (page - 1) * pageSize
return all_items[offset:offset + pageSize], None
nullLabels = {lvl.get("field", ""): lvl.get("nullLabel", "\u2014") for lvl in groupByLevels}
def _path_key(item: dict) -> tuple:
return tuple(
str(item.get(f) or "") if item.get(f) is not None else nullLabels.get(f, "\u2014")
for f in levels
)
def _item_cmp(a: dict, b: dict) -> int:
pa, pb = _path_key(a), _path_key(b)
for i in range(len(levels)):
if pa[i] != pb[i]:
asc = (groupByLevels[i].get("direction") or "asc").lower() != "desc"
if pa[i] < pb[i]:
return -1 if asc else 1
return 1 if asc else -1
return 0
all_items.sort(key=cmp_to_key(_item_cmp))
bands_global: List[dict] = []
current_path: Optional[tuple] = None
current_start = 0
for i, item in enumerate(all_items):
path = _path_key(item)
if path != current_path:
if current_path is not None:
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": i})
current_path = path
current_start = i
if current_path is not None:
bands_global.append({"path": list(current_path), "startIdx": current_start, "endIdx": len(all_items)})
page_start = (page - 1) * pageSize
page_end = page_start + pageSize
page_items = all_items[page_start:page_end]
bands_on_page: List[GroupBand] = []
for band in bands_global:
inter_start = max(band["startIdx"], page_start)
inter_end = min(band["endIdx"], page_end)
if inter_start >= inter_end:
continue
path_list = band["path"]
bands_on_page.append(GroupBand(
path=path_list,
label=path_list[-1] if path_list else "\u2014",
startRowIndex=inter_start - page_start,
rowCount=inter_end - inter_start,
))
group_layout = GroupLayout(levels=levels, bands=bands_on_page) if bands_on_page else GroupLayout(levels=levels, bands=[])
return page_items, group_layout