platform-core/tests/unit/services/test_buildTree.py
2026-05-19 16:48:01 +02:00

359 lines
15 KiB
Python

"""Unit tests for the generic UDB tree builder.
Verifies key encoding/decoding and that children for parent keys with
existing handlers (top-level, conn, mgrp, feat) are produced with the
correct effective-flag triplet.
"""
from __future__ import annotations
import asyncio
import unittest
from unittest.mock import MagicMock, patch
from modules.serviceCenter.services.serviceKnowledge import _buildTree
class TestKeyCoding(unittest.TestCase):
def test_encode_decode_roundtrip(self):
key = _buildTree._encode("ds", "conn-1", "sharepointFolder", "/sites/x")
kind, parts = _buildTree._decode(key)
self.assertEqual(kind, "ds")
self.assertEqual(parts, ["conn-1", "sharepointFolder", "/sites/x"])
def test_top_level_kinds(self):
self.assertEqual(_buildTree._decode("conn|abc")[0], "conn")
self.assertEqual(_buildTree._decode("mgrp|m1")[0], "mgrp")
self.assertEqual(_buildTree._decode("feat|m1|trustee|fi-1")[1], ["m1", "trustee", "fi-1"])
class TestEffectiveTriplets(unittest.TestCase):
def test_ds_triplet_no_record_returns_defaults(self):
result = _buildTree._effectiveTripletDs("c", "msft", "/", [])
self.assertEqual(result, {
"effectiveNeutralize": False,
"effectiveScope": "personal",
"effectiveRagIndexEnabled": False,
})
def test_ds_triplet_inherits_from_root(self):
root = {
"id": "r", "connectionId": "c", "sourceType": "msft", "path": "/",
"neutralize": True, "scope": "mandate", "ragIndexEnabled": True,
}
result = _buildTree._effectiveTripletDs("c", "sharepointFolder", "/sites/x", [root])
self.assertEqual(result["effectiveNeutralize"], True)
self.assertEqual(result["effectiveScope"], "mandate")
self.assertEqual(result["effectiveRagIndexEnabled"], True)
def test_fds_triplet_inherits_from_workspace_wildcard(self):
ws = {
"id": "ws", "workspaceInstanceId": "ws-inst", "featureInstanceId": "fi1",
"tableName": "*", "recordFilter": None, "neutralize": True,
"scope": "mandate", "ragIndexEnabled": True,
}
result = _buildTree._effectiveTripletFds("fi1", "Pos", None, [ws])
self.assertEqual(result["effectiveNeutralize"], True)
self.assertEqual(result["effectiveScope"], "mandate")
self.assertEqual(result["effectiveRagIndexEnabled"], True)
class TestRecordLookup(unittest.TestCase):
def test_finds_ds_record_by_normalised_path(self):
rec = {"id": "x", "connectionId": "c", "sourceType": "msft", "path": "/folder"}
self.assertEqual(_buildTree._findDsRecord([rec], "c", "msft", "/folder/").get("id"), "x")
self.assertIsNone(_buildTree._findDsRecord([rec], "c", "msft", "/other"))
def test_finds_fds_record_with_matching_filter(self):
rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "Pos", "recordFilter": {"id": "5"}}
self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "5"}).get("id"), "f")
self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "99"}))
def test_fds_record_with_none_filter_matches_only_none(self):
rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "*", "recordFilter": None}
self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "*", None).get("id"), "f")
self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "*", {"id": "1"}))
class TestGetChildrenForParents(unittest.TestCase):
"""End-to-end orchestrator test with mocked dependencies."""
def _runAsync(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_unknown_parent_key_returns_empty_list(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
mockRoot.return_value = rootIf
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = "m1"
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", ["bogus|key"], ctx)
)
self.assertEqual(result["bogus|key"], [])
def test_top_level_emits_personal_root_first(self):
"""Top-level emits personalRoot first, then mandate-group nodes inline."""
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
rootIf.getUserMandates.return_value = []
mockRoot.return_value = rootIf
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = "m1"
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", [None], ctx)
)
children = result["__root__"]
self.assertGreaterEqual(len(children), 1)
personalRoot = children[0]
self.assertEqual(personalRoot["key"], "personalRoot")
self.assertEqual(personalRoot["kind"], "synthRoot")
self.assertIsNone(personalRoot["parentKey"])
self.assertTrue(personalRoot["hasChildren"])
self.assertTrue(personalRoot["defaultExpanded"])
class TestTopLevelLayout(unittest.TestCase):
"""Tests for the flat top-level layout (personalRoot + mandate groups)."""
def _runAsync(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_personal_root_carries_neutral_default_triplet(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
rootIf.getUserMandates.return_value = []
mockRoot.return_value = rootIf
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = "m1"
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", [None], ctx)
)
personalRoot = result["__root__"][0]
self.assertFalse(personalRoot["effectiveNeutralize"])
self.assertEqual(personalRoot["effectiveScope"], "personal")
self.assertFalse(personalRoot["effectiveRagIndexEnabled"])
self.assertFalse(personalRoot["supportsRag"])
self.assertFalse(personalRoot["canBeAdded"])
self.assertIsNone(personalRoot["dataSourceId"])
self.assertIsNone(personalRoot["modelType"])
def test_personal_root_emits_active_connection_with_correct_parent(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
patch("modules.serviceCenter.getService") as mockGetService:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
mockRoot.return_value = rootIf
chatService = MagicMock()
chatService.getUserConnections.return_value = [{
"id": "conn-1",
"status": "active",
"authority": "msft",
"externalEmail": "user@example.com",
}]
mockGetService.return_value = chatService
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = "m1"
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx)
)
children = result["personalRoot"]
self.assertEqual(len(children), 1)
self.assertEqual(children[0]["key"], "conn|conn-1")
self.assertEqual(children[0]["kind"], "connection")
self.assertEqual(children[0]["parentKey"], "personalRoot")
self.assertEqual(children[0]["label"], "user@example.com")
self.assertTrue(children[0]["supportsRag"])
def test_personal_root_skips_inactive_connection(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
patch("modules.serviceCenter.getService") as mockGetService:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
mockRoot.return_value = rootIf
chatService = MagicMock()
chatService.getUserConnections.return_value = [
{"id": "c1", "status": "active", "authority": "msft", "externalEmail": "a"},
{"id": "c2", "status": "expired", "authority": "google", "externalEmail": "b"},
]
mockGetService.return_value = chatService
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = "m1"
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx)
)
self.assertEqual(len(result["personalRoot"]), 1)
self.assertEqual(result["personalRoot"][0]["connectionId"], "c1")
def test_mandate_groups_emitted_inline_at_top_level(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
userMandate = MagicMock()
userMandate.mandateId = "m1"
rootIf.getUserMandates.return_value = [userMandate]
featureInst = MagicMock()
featureInst.id = "fi-1"
featureInst.featureCode = "trustee"
featureInst.enabled = True
rootIf.getFeatureInstancesByMandate.return_value = [featureInst]
featureAccess = MagicMock()
featureAccess.enabled = True
rootIf.getFeatureAccess.return_value = featureAccess
mockRoot.return_value = rootIf
catalog = MagicMock()
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
mockCatalog.return_value = catalog
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = None
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", [None], ctx)
)
children = result["__root__"]
byKey = {c["key"]: c for c in children}
self.assertIn("personalRoot", byKey)
self.assertIn("mgrp|m1", byKey)
mgroup = byKey["mgrp|m1"]
self.assertEqual(mgroup["kind"], "mandateGroup")
self.assertIsNone(mgroup["parentKey"])
self.assertEqual(mgroup["mandateId"], "m1")
self.assertTrue(mgroup["defaultExpanded"])
self.assertFalse(mgroup["supportsRag"])
def test_top_level_omits_mandates_without_data_features(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
userMandate = MagicMock()
userMandate.mandateId = "m1"
rootIf.getUserMandates.return_value = [userMandate]
rootIf.getFeatureInstancesByMandate.return_value = []
mockRoot.return_value = rootIf
catalog = MagicMock()
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
mockCatalog.return_value = catalog
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = None
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", [None], ctx)
)
keys = [c["key"] for c in result["__root__"]]
self.assertEqual(keys, ["personalRoot"])
def test_personal_root_listed_first_via_display_order(self):
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
rootIf = MagicMock()
rootIf.db.getRecordset.return_value = []
userMandate = MagicMock()
userMandate.mandateId = "m1"
rootIf.getUserMandates.return_value = [userMandate]
featureInst = MagicMock()
featureInst.id = "fi-1"
featureInst.featureCode = "trustee"
featureInst.enabled = True
rootIf.getFeatureInstancesByMandate.return_value = [featureInst]
featureAccess = MagicMock()
featureAccess.enabled = True
rootIf.getFeatureAccess.return_value = featureAccess
mockRoot.return_value = rootIf
catalog = MagicMock()
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
mockCatalog.return_value = catalog
ctx = MagicMock()
ctx.user.id = "u1"
ctx.mandateId = None
result = self._runAsync(
_buildTree.getChildrenForParents("inst-1", [None], ctx)
)
children = result["__root__"]
self.assertEqual(children[0]["key"], "personalRoot")
self.assertEqual(children[0]["displayOrder"], 0)
class TestFeatureTableFields(unittest.TestCase):
"""Per-column field expansion under a feature data-source table."""
def test_emits_one_node_per_field(self):
nodes = _buildTree._featureTableFields(
parentKey="fdstbl|fi-1|TrusteePosition",
featureInstanceId="fi-1",
tableName="TrusteePosition",
fieldNames=["id", "valuta", "company"],
allFds=[],
)
self.assertEqual(len(nodes), 3)
self.assertEqual(nodes[0]["kind"], "fdsField")
self.assertEqual(nodes[0]["fieldName"], "id")
self.assertEqual(nodes[0]["parentKey"], "fdstbl|fi-1|TrusteePosition")
self.assertEqual(nodes[0]["key"], "fdsfld|fi-1|TrusteePosition|id")
self.assertFalse(nodes[0]["hasChildren"])
self.assertFalse(nodes[0]["supportsRag"])
def test_field_neutralize_inherits_from_table_blanket(self):
rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1",
"tableName": "TrusteePosition", "recordFilter": None,
"neutralize": True, "neutralizeFields": None,
"scope": None, "ragIndexEnabled": False}
nodes = _buildTree._featureTableFields(
parentKey="fdstbl|fi-1|TrusteePosition",
featureInstanceId="fi-1",
tableName="TrusteePosition",
fieldNames=["email", "company"],
allFds=[rec],
)
self.assertTrue(nodes[0]["effectiveNeutralize"])
self.assertTrue(nodes[1]["effectiveNeutralize"])
def test_field_neutralize_explicit_via_neutralize_fields(self):
rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1",
"tableName": "TrusteePosition", "recordFilter": None,
"neutralize": False, "neutralizeFields": ["email"],
"scope": None, "ragIndexEnabled": False}
nodes = _buildTree._featureTableFields(
parentKey="fdstbl|fi-1|TrusteePosition",
featureInstanceId="fi-1",
tableName="TrusteePosition",
fieldNames=["email", "company"],
allFds=[rec],
)
byField = {n["fieldName"]: n for n in nodes}
self.assertTrue(byField["email"]["effectiveNeutralize"])
self.assertFalse(byField["company"]["effectiveNeutralize"])
if __name__ == "__main__":
unittest.main()