359 lines
15 KiB
Python
359 lines
15 KiB
Python
"""Unit tests for the generic UDB tree builder.
|
|
|
|
Verifies key encoding/decoding and that children for parent keys with
|
|
existing handlers (top-level, conn, mgrp, feat) are produced with the
|
|
correct effective-flag triplet.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from modules.serviceCenter.services.serviceKnowledge import _buildTree
|
|
|
|
|
|
class TestKeyCoding(unittest.TestCase):
|
|
def test_encode_decode_roundtrip(self):
|
|
key = _buildTree._encode("ds", "conn-1", "sharepointFolder", "/sites/x")
|
|
kind, parts = _buildTree._decode(key)
|
|
self.assertEqual(kind, "ds")
|
|
self.assertEqual(parts, ["conn-1", "sharepointFolder", "/sites/x"])
|
|
|
|
def test_top_level_kinds(self):
|
|
self.assertEqual(_buildTree._decode("conn|abc")[0], "conn")
|
|
self.assertEqual(_buildTree._decode("mgrp|m1")[0], "mgrp")
|
|
self.assertEqual(_buildTree._decode("feat|m1|trustee|fi-1")[1], ["m1", "trustee", "fi-1"])
|
|
|
|
|
|
class TestEffectiveTriplets(unittest.TestCase):
|
|
def test_ds_triplet_no_record_returns_defaults(self):
|
|
result = _buildTree._effectiveTripletDs("c", "msft", "/", [])
|
|
self.assertEqual(result, {
|
|
"effectiveNeutralize": False,
|
|
"effectiveScope": "personal",
|
|
"effectiveRagIndexEnabled": False,
|
|
})
|
|
|
|
def test_ds_triplet_inherits_from_root(self):
|
|
root = {
|
|
"id": "r", "connectionId": "c", "sourceType": "msft", "path": "/",
|
|
"neutralize": True, "scope": "mandate", "ragIndexEnabled": True,
|
|
}
|
|
result = _buildTree._effectiveTripletDs("c", "sharepointFolder", "/sites/x", [root])
|
|
self.assertEqual(result["effectiveNeutralize"], True)
|
|
self.assertEqual(result["effectiveScope"], "mandate")
|
|
self.assertEqual(result["effectiveRagIndexEnabled"], True)
|
|
|
|
def test_fds_triplet_inherits_from_workspace_wildcard(self):
|
|
ws = {
|
|
"id": "ws", "workspaceInstanceId": "ws-inst", "featureInstanceId": "fi1",
|
|
"tableName": "*", "recordFilter": None, "neutralize": True,
|
|
"scope": "mandate", "ragIndexEnabled": True,
|
|
}
|
|
result = _buildTree._effectiveTripletFds("fi1", "Pos", None, [ws])
|
|
self.assertEqual(result["effectiveNeutralize"], True)
|
|
self.assertEqual(result["effectiveScope"], "mandate")
|
|
self.assertEqual(result["effectiveRagIndexEnabled"], True)
|
|
|
|
|
|
class TestRecordLookup(unittest.TestCase):
|
|
def test_finds_ds_record_by_normalised_path(self):
|
|
rec = {"id": "x", "connectionId": "c", "sourceType": "msft", "path": "/folder"}
|
|
self.assertEqual(_buildTree._findDsRecord([rec], "c", "msft", "/folder/").get("id"), "x")
|
|
self.assertIsNone(_buildTree._findDsRecord([rec], "c", "msft", "/other"))
|
|
|
|
def test_finds_fds_record_with_matching_filter(self):
|
|
rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "Pos", "recordFilter": {"id": "5"}}
|
|
self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "5"}).get("id"), "f")
|
|
self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "99"}))
|
|
|
|
def test_fds_record_with_none_filter_matches_only_none(self):
|
|
rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "*", "recordFilter": None}
|
|
self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "*", None).get("id"), "f")
|
|
self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "*", {"id": "1"}))
|
|
|
|
|
|
class TestGetChildrenForParents(unittest.TestCase):
|
|
"""End-to-end orchestrator test with mocked dependencies."""
|
|
|
|
def _runAsync(self, coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
def test_unknown_parent_key_returns_empty_list(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = "m1"
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", ["bogus|key"], ctx)
|
|
)
|
|
self.assertEqual(result["bogus|key"], [])
|
|
|
|
def test_top_level_emits_personal_root_first(self):
|
|
"""Top-level emits personalRoot first, then mandate-group nodes inline."""
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
rootIf.getUserMandates.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = "m1"
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", [None], ctx)
|
|
)
|
|
children = result["__root__"]
|
|
self.assertGreaterEqual(len(children), 1)
|
|
personalRoot = children[0]
|
|
self.assertEqual(personalRoot["key"], "personalRoot")
|
|
self.assertEqual(personalRoot["kind"], "synthRoot")
|
|
self.assertIsNone(personalRoot["parentKey"])
|
|
self.assertTrue(personalRoot["hasChildren"])
|
|
self.assertTrue(personalRoot["defaultExpanded"])
|
|
|
|
|
|
class TestTopLevelLayout(unittest.TestCase):
|
|
"""Tests for the flat top-level layout (personalRoot + mandate groups)."""
|
|
|
|
def _runAsync(self, coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
def test_personal_root_carries_neutral_default_triplet(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
rootIf.getUserMandates.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = "m1"
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", [None], ctx)
|
|
)
|
|
personalRoot = result["__root__"][0]
|
|
self.assertFalse(personalRoot["effectiveNeutralize"])
|
|
self.assertEqual(personalRoot["effectiveScope"], "personal")
|
|
self.assertFalse(personalRoot["effectiveRagIndexEnabled"])
|
|
self.assertFalse(personalRoot["supportsRag"])
|
|
self.assertFalse(personalRoot["canBeAdded"])
|
|
self.assertIsNone(personalRoot["dataSourceId"])
|
|
self.assertIsNone(personalRoot["modelType"])
|
|
|
|
def test_personal_root_emits_active_connection_with_correct_parent(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
|
|
patch("modules.serviceCenter.getService") as mockGetService:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
chatService = MagicMock()
|
|
chatService.getUserConnections.return_value = [{
|
|
"id": "conn-1",
|
|
"status": "active",
|
|
"authority": "msft",
|
|
"externalEmail": "user@example.com",
|
|
}]
|
|
mockGetService.return_value = chatService
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = "m1"
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx)
|
|
)
|
|
children = result["personalRoot"]
|
|
self.assertEqual(len(children), 1)
|
|
self.assertEqual(children[0]["key"], "conn|conn-1")
|
|
self.assertEqual(children[0]["kind"], "connection")
|
|
self.assertEqual(children[0]["parentKey"], "personalRoot")
|
|
self.assertEqual(children[0]["label"], "user@example.com")
|
|
self.assertTrue(children[0]["supportsRag"])
|
|
|
|
def test_personal_root_skips_inactive_connection(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
|
|
patch("modules.serviceCenter.getService") as mockGetService:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
chatService = MagicMock()
|
|
chatService.getUserConnections.return_value = [
|
|
{"id": "c1", "status": "active", "authority": "msft", "externalEmail": "a"},
|
|
{"id": "c2", "status": "expired", "authority": "google", "externalEmail": "b"},
|
|
]
|
|
mockGetService.return_value = chatService
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = "m1"
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx)
|
|
)
|
|
self.assertEqual(len(result["personalRoot"]), 1)
|
|
self.assertEqual(result["personalRoot"][0]["connectionId"], "c1")
|
|
|
|
def test_mandate_groups_emitted_inline_at_top_level(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
|
|
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
userMandate = MagicMock()
|
|
userMandate.mandateId = "m1"
|
|
rootIf.getUserMandates.return_value = [userMandate]
|
|
featureInst = MagicMock()
|
|
featureInst.id = "fi-1"
|
|
featureInst.featureCode = "trustee"
|
|
featureInst.enabled = True
|
|
rootIf.getFeatureInstancesByMandate.return_value = [featureInst]
|
|
featureAccess = MagicMock()
|
|
featureAccess.enabled = True
|
|
rootIf.getFeatureAccess.return_value = featureAccess
|
|
mockRoot.return_value = rootIf
|
|
|
|
catalog = MagicMock()
|
|
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
|
|
mockCatalog.return_value = catalog
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = None
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", [None], ctx)
|
|
)
|
|
children = result["__root__"]
|
|
byKey = {c["key"]: c for c in children}
|
|
self.assertIn("personalRoot", byKey)
|
|
self.assertIn("mgrp|m1", byKey)
|
|
mgroup = byKey["mgrp|m1"]
|
|
self.assertEqual(mgroup["kind"], "mandateGroup")
|
|
self.assertIsNone(mgroup["parentKey"])
|
|
self.assertEqual(mgroup["mandateId"], "m1")
|
|
self.assertTrue(mgroup["defaultExpanded"])
|
|
self.assertFalse(mgroup["supportsRag"])
|
|
|
|
def test_top_level_omits_mandates_without_data_features(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
|
|
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
userMandate = MagicMock()
|
|
userMandate.mandateId = "m1"
|
|
rootIf.getUserMandates.return_value = [userMandate]
|
|
rootIf.getFeatureInstancesByMandate.return_value = []
|
|
mockRoot.return_value = rootIf
|
|
|
|
catalog = MagicMock()
|
|
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
|
|
mockCatalog.return_value = catalog
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = None
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", [None], ctx)
|
|
)
|
|
keys = [c["key"] for c in result["__root__"]]
|
|
self.assertEqual(keys, ["personalRoot"])
|
|
|
|
def test_personal_root_listed_first_via_display_order(self):
|
|
with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \
|
|
patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog:
|
|
rootIf = MagicMock()
|
|
rootIf.db.getRecordset.return_value = []
|
|
userMandate = MagicMock()
|
|
userMandate.mandateId = "m1"
|
|
rootIf.getUserMandates.return_value = [userMandate]
|
|
featureInst = MagicMock()
|
|
featureInst.id = "fi-1"
|
|
featureInst.featureCode = "trustee"
|
|
featureInst.enabled = True
|
|
rootIf.getFeatureInstancesByMandate.return_value = [featureInst]
|
|
featureAccess = MagicMock()
|
|
featureAccess.enabled = True
|
|
rootIf.getFeatureAccess.return_value = featureAccess
|
|
mockRoot.return_value = rootIf
|
|
|
|
catalog = MagicMock()
|
|
catalog.getFeaturesWithDataObjects.return_value = ["trustee"]
|
|
mockCatalog.return_value = catalog
|
|
|
|
ctx = MagicMock()
|
|
ctx.user.id = "u1"
|
|
ctx.mandateId = None
|
|
|
|
result = self._runAsync(
|
|
_buildTree.getChildrenForParents("inst-1", [None], ctx)
|
|
)
|
|
children = result["__root__"]
|
|
self.assertEqual(children[0]["key"], "personalRoot")
|
|
self.assertEqual(children[0]["displayOrder"], 0)
|
|
|
|
|
|
class TestFeatureTableFields(unittest.TestCase):
|
|
"""Per-column field expansion under a feature data-source table."""
|
|
|
|
def test_emits_one_node_per_field(self):
|
|
nodes = _buildTree._featureTableFields(
|
|
parentKey="fdstbl|fi-1|TrusteePosition",
|
|
featureInstanceId="fi-1",
|
|
tableName="TrusteePosition",
|
|
fieldNames=["id", "valuta", "company"],
|
|
allFds=[],
|
|
)
|
|
self.assertEqual(len(nodes), 3)
|
|
self.assertEqual(nodes[0]["kind"], "fdsField")
|
|
self.assertEqual(nodes[0]["fieldName"], "id")
|
|
self.assertEqual(nodes[0]["parentKey"], "fdstbl|fi-1|TrusteePosition")
|
|
self.assertEqual(nodes[0]["key"], "fdsfld|fi-1|TrusteePosition|id")
|
|
self.assertFalse(nodes[0]["hasChildren"])
|
|
self.assertFalse(nodes[0]["supportsRag"])
|
|
|
|
def test_field_neutralize_inherits_from_table_blanket(self):
|
|
rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1",
|
|
"tableName": "TrusteePosition", "recordFilter": None,
|
|
"neutralize": True, "neutralizeFields": None,
|
|
"scope": None, "ragIndexEnabled": False}
|
|
nodes = _buildTree._featureTableFields(
|
|
parentKey="fdstbl|fi-1|TrusteePosition",
|
|
featureInstanceId="fi-1",
|
|
tableName="TrusteePosition",
|
|
fieldNames=["email", "company"],
|
|
allFds=[rec],
|
|
)
|
|
self.assertTrue(nodes[0]["effectiveNeutralize"])
|
|
self.assertTrue(nodes[1]["effectiveNeutralize"])
|
|
|
|
def test_field_neutralize_explicit_via_neutralize_fields(self):
|
|
rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1",
|
|
"tableName": "TrusteePosition", "recordFilter": None,
|
|
"neutralize": False, "neutralizeFields": ["email"],
|
|
"scope": None, "ragIndexEnabled": False}
|
|
nodes = _buildTree._featureTableFields(
|
|
parentKey="fdstbl|fi-1|TrusteePosition",
|
|
featureInstanceId="fi-1",
|
|
tableName="TrusteePosition",
|
|
fieldNames=["email", "company"],
|
|
allFds=[rec],
|
|
)
|
|
byField = {n["fieldName"]: n for n in nodes}
|
|
self.assertTrue(byField["email"]["effectiveNeutralize"])
|
|
self.assertFalse(byField["company"]["effectiveNeutralize"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|