"""Unit tests for the generic UDB tree builder. Verifies key encoding/decoding and that children for parent keys with existing handlers (top-level, conn, mgrp, feat) are produced with the correct effective-flag triplet. """ from __future__ import annotations import asyncio import unittest from unittest.mock import MagicMock, patch from modules.serviceCenter.services.serviceKnowledge import _buildTree class TestKeyCoding(unittest.TestCase): def test_encode_decode_roundtrip(self): key = _buildTree._encode("ds", "conn-1", "sharepointFolder", "/sites/x") kind, parts = _buildTree._decode(key) self.assertEqual(kind, "ds") self.assertEqual(parts, ["conn-1", "sharepointFolder", "/sites/x"]) def test_top_level_kinds(self): self.assertEqual(_buildTree._decode("conn|abc")[0], "conn") self.assertEqual(_buildTree._decode("mgrp|m1")[0], "mgrp") self.assertEqual(_buildTree._decode("feat|m1|trustee|fi-1")[1], ["m1", "trustee", "fi-1"]) class TestEffectiveTriplets(unittest.TestCase): def test_ds_triplet_no_record_returns_defaults(self): result = _buildTree._effectiveTripletDs("c", "msft", "/", []) self.assertEqual(result, { "effectiveNeutralize": False, "effectiveScope": "personal", "effectiveRagIndexEnabled": False, }) def test_ds_triplet_inherits_from_root(self): root = { "id": "r", "connectionId": "c", "sourceType": "msft", "path": "/", "neutralize": True, "scope": "mandate", "ragIndexEnabled": True, } result = _buildTree._effectiveTripletDs("c", "sharepointFolder", "/sites/x", [root]) self.assertEqual(result["effectiveNeutralize"], True) self.assertEqual(result["effectiveScope"], "mandate") self.assertEqual(result["effectiveRagIndexEnabled"], True) def test_fds_triplet_inherits_from_workspace_wildcard(self): ws = { "id": "ws", "workspaceInstanceId": "ws-inst", "featureInstanceId": "fi1", "tableName": "*", "recordFilter": None, "neutralize": True, "scope": "mandate", "ragIndexEnabled": True, } result = _buildTree._effectiveTripletFds("fi1", "Pos", None, [ws]) self.assertEqual(result["effectiveNeutralize"], True) self.assertEqual(result["effectiveScope"], "mandate") self.assertEqual(result["effectiveRagIndexEnabled"], True) class TestRecordLookup(unittest.TestCase): def test_finds_ds_record_by_normalised_path(self): rec = {"id": "x", "connectionId": "c", "sourceType": "msft", "path": "/folder"} self.assertEqual(_buildTree._findDsRecord([rec], "c", "msft", "/folder/").get("id"), "x") self.assertIsNone(_buildTree._findDsRecord([rec], "c", "msft", "/other")) def test_finds_fds_record_with_matching_filter(self): rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "Pos", "recordFilter": {"id": "5"}} self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "5"}).get("id"), "f") self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "Pos", {"id": "99"})) def test_fds_record_with_none_filter_matches_only_none(self): rec = {"id": "f", "workspaceInstanceId": "ws", "featureInstanceId": "fi1", "tableName": "*", "recordFilter": None} self.assertEqual(_buildTree._findFdsRecord([rec], "fi1", "*", None).get("id"), "f") self.assertIsNone(_buildTree._findFdsRecord([rec], "fi1", "*", {"id": "1"})) class TestGetChildrenForParents(unittest.TestCase): """End-to-end orchestrator test with mocked dependencies.""" def _runAsync(self, coro): return asyncio.get_event_loop().run_until_complete(coro) def test_unknown_parent_key_returns_empty_list(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] mockRoot.return_value = rootIf ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = "m1" result = self._runAsync( _buildTree.getChildrenForParents("inst-1", ["bogus|key"], ctx) ) self.assertEqual(result["bogus|key"], []) def test_top_level_emits_personal_root_first(self): """Top-level emits personalRoot first, then mandate-group nodes inline.""" with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] rootIf.getUserMandates.return_value = [] mockRoot.return_value = rootIf ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = "m1" result = self._runAsync( _buildTree.getChildrenForParents("inst-1", [None], ctx) ) children = result["__root__"] self.assertGreaterEqual(len(children), 1) personalRoot = children[0] self.assertEqual(personalRoot["key"], "personalRoot") self.assertEqual(personalRoot["kind"], "synthRoot") self.assertIsNone(personalRoot["parentKey"]) self.assertTrue(personalRoot["hasChildren"]) self.assertTrue(personalRoot["defaultExpanded"]) class TestTopLevelLayout(unittest.TestCase): """Tests for the flat top-level layout (personalRoot + mandate groups).""" def _runAsync(self, coro): return asyncio.get_event_loop().run_until_complete(coro) def test_personal_root_carries_neutral_default_triplet(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] rootIf.getUserMandates.return_value = [] mockRoot.return_value = rootIf ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = "m1" result = self._runAsync( _buildTree.getChildrenForParents("inst-1", [None], ctx) ) personalRoot = result["__root__"][0] self.assertFalse(personalRoot["effectiveNeutralize"]) self.assertEqual(personalRoot["effectiveScope"], "personal") self.assertFalse(personalRoot["effectiveRagIndexEnabled"]) self.assertFalse(personalRoot["supportsRag"]) self.assertFalse(personalRoot["canBeAdded"]) self.assertIsNone(personalRoot["dataSourceId"]) self.assertIsNone(personalRoot["modelType"]) def test_personal_root_emits_active_connection_with_correct_parent(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \ patch("modules.serviceCenter.getService") as mockGetService: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] mockRoot.return_value = rootIf chatService = MagicMock() chatService.getUserConnections.return_value = [{ "id": "conn-1", "status": "active", "authority": "msft", "externalEmail": "user@example.com", }] mockGetService.return_value = chatService ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = "m1" result = self._runAsync( _buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx) ) children = result["personalRoot"] self.assertEqual(len(children), 1) self.assertEqual(children[0]["key"], "conn|conn-1") self.assertEqual(children[0]["kind"], "connection") self.assertEqual(children[0]["parentKey"], "personalRoot") self.assertEqual(children[0]["label"], "user@example.com") self.assertTrue(children[0]["supportsRag"]) def test_personal_root_skips_inactive_connection(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \ patch("modules.serviceCenter.getService") as mockGetService: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] mockRoot.return_value = rootIf chatService = MagicMock() chatService.getUserConnections.return_value = [ {"id": "c1", "status": "active", "authority": "msft", "externalEmail": "a"}, {"id": "c2", "status": "expired", "authority": "google", "externalEmail": "b"}, ] mockGetService.return_value = chatService ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = "m1" result = self._runAsync( _buildTree.getChildrenForParents("inst-1", ["personalRoot"], ctx) ) self.assertEqual(len(result["personalRoot"]), 1) self.assertEqual(result["personalRoot"][0]["connectionId"], "c1") def test_mandate_groups_emitted_inline_at_top_level(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \ patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] userMandate = MagicMock() userMandate.mandateId = "m1" rootIf.getUserMandates.return_value = [userMandate] featureInst = MagicMock() featureInst.id = "fi-1" featureInst.featureCode = "trustee" featureInst.enabled = True rootIf.getFeatureInstancesByMandate.return_value = [featureInst] featureAccess = MagicMock() featureAccess.enabled = True rootIf.getFeatureAccess.return_value = featureAccess mockRoot.return_value = rootIf catalog = MagicMock() catalog.getFeaturesWithDataObjects.return_value = ["trustee"] mockCatalog.return_value = catalog ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = None result = self._runAsync( _buildTree.getChildrenForParents("inst-1", [None], ctx) ) children = result["__root__"] byKey = {c["key"]: c for c in children} self.assertIn("personalRoot", byKey) self.assertIn("mgrp|m1", byKey) mgroup = byKey["mgrp|m1"] self.assertEqual(mgroup["kind"], "mandateGroup") self.assertIsNone(mgroup["parentKey"]) self.assertEqual(mgroup["mandateId"], "m1") self.assertTrue(mgroup["defaultExpanded"]) self.assertFalse(mgroup["supportsRag"]) def test_top_level_omits_mandates_without_data_features(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \ patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] userMandate = MagicMock() userMandate.mandateId = "m1" rootIf.getUserMandates.return_value = [userMandate] rootIf.getFeatureInstancesByMandate.return_value = [] mockRoot.return_value = rootIf catalog = MagicMock() catalog.getFeaturesWithDataObjects.return_value = ["trustee"] mockCatalog.return_value = catalog ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = None result = self._runAsync( _buildTree.getChildrenForParents("inst-1", [None], ctx) ) keys = [c["key"] for c in result["__root__"]] self.assertEqual(keys, ["personalRoot"]) def test_personal_root_listed_first_via_display_order(self): with patch("modules.interfaces.interfaceDbApp.getRootInterface") as mockRoot, \ patch("modules.security.rbacCatalog.getCatalogService") as mockCatalog: rootIf = MagicMock() rootIf.db.getRecordset.return_value = [] userMandate = MagicMock() userMandate.mandateId = "m1" rootIf.getUserMandates.return_value = [userMandate] featureInst = MagicMock() featureInst.id = "fi-1" featureInst.featureCode = "trustee" featureInst.enabled = True rootIf.getFeatureInstancesByMandate.return_value = [featureInst] featureAccess = MagicMock() featureAccess.enabled = True rootIf.getFeatureAccess.return_value = featureAccess mockRoot.return_value = rootIf catalog = MagicMock() catalog.getFeaturesWithDataObjects.return_value = ["trustee"] mockCatalog.return_value = catalog ctx = MagicMock() ctx.user.id = "u1" ctx.mandateId = None result = self._runAsync( _buildTree.getChildrenForParents("inst-1", [None], ctx) ) children = result["__root__"] self.assertEqual(children[0]["key"], "personalRoot") self.assertEqual(children[0]["displayOrder"], 0) class TestFeatureTableFields(unittest.TestCase): """Per-column field expansion under a feature data-source table.""" def test_emits_one_node_per_field(self): nodes = _buildTree._featureTableFields( parentKey="fdstbl|fi-1|TrusteePosition", featureInstanceId="fi-1", tableName="TrusteePosition", fieldNames=["id", "valuta", "company"], allFds=[], ) self.assertEqual(len(nodes), 3) self.assertEqual(nodes[0]["kind"], "fdsField") self.assertEqual(nodes[0]["fieldName"], "id") self.assertEqual(nodes[0]["parentKey"], "fdstbl|fi-1|TrusteePosition") self.assertEqual(nodes[0]["key"], "fdsfld|fi-1|TrusteePosition|id") self.assertFalse(nodes[0]["hasChildren"]) self.assertFalse(nodes[0]["supportsRag"]) def test_field_neutralize_inherits_from_table_blanket(self): rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1", "tableName": "TrusteePosition", "recordFilter": None, "neutralize": True, "neutralizeFields": None, "scope": None, "ragIndexEnabled": False} nodes = _buildTree._featureTableFields( parentKey="fdstbl|fi-1|TrusteePosition", featureInstanceId="fi-1", tableName="TrusteePosition", fieldNames=["email", "company"], allFds=[rec], ) self.assertTrue(nodes[0]["effectiveNeutralize"]) self.assertTrue(nodes[1]["effectiveNeutralize"]) def test_field_neutralize_explicit_via_neutralize_fields(self): rec = {"id": "f", "workspaceInstanceId": "ws-1", "featureInstanceId": "fi-1", "tableName": "TrusteePosition", "recordFilter": None, "neutralize": False, "neutralizeFields": ["email"], "scope": None, "ragIndexEnabled": False} nodes = _buildTree._featureTableFields( parentKey="fdstbl|fi-1|TrusteePosition", featureInstanceId="fi-1", tableName="TrusteePosition", fieldNames=["email", "company"], allFds=[rec], ) byField = {n["fieldName"]: n for n in nodes} self.assertTrue(byField["email"]["effectiveNeutralize"]) self.assertFalse(byField["company"]["effectiveNeutralize"]) if __name__ == "__main__": unittest.main()