#!/usr/bin/env python3 # Copyright (c) 2025 Patrick Motsch # All rights reserved. """Purge tests for KnowledgeObjects.deleteFileContentIndexByConnectionId. Ensures that a `connection.revoked` event wipes every FileContentIndex + chunk linked to the given connectionId while leaving entries from other connections (or upload-files with connectionId=None) intact. """ import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk from modules.interfaces.interfaceDbKnowledge import KnowledgeObjects class _FakeDb: """Minimal in-memory stand-in for ``KnowledgeObjects.db``. Supports just the subset of APIs that deleteFileContentIndexByConnectionId touches: getRecordset(FileContentIndex|ContentChunk, ...) + recordDelete. """ def __init__(self): self.indexRows: dict = {} self.chunks: dict = {} def addIndex(self, row: dict) -> None: self.indexRows[row["id"]] = row def addChunk(self, row: dict) -> None: self.chunks[row["id"]] = row def getRecordset(self, modelClass, recordFilter=None, **_): filter_ = recordFilter or {} if modelClass is FileContentIndex: rows = list(self.indexRows.values()) elif modelClass is ContentChunk: rows = list(self.chunks.values()) else: return [] def match(row): for k, v in filter_.items(): if row.get(k) != v: return False return True return [r for r in rows if match(r)] def recordDelete(self, modelClass, recordId): if modelClass is FileContentIndex: return self.indexRows.pop(recordId, None) is not None if modelClass is ContentChunk: return self.chunks.pop(recordId, None) is not None return False def _buildKnowledge(): """Instantiate KnowledgeObjects without triggering the real DB bootstrap.""" ko = KnowledgeObjects.__new__(KnowledgeObjects) ko.currentUser = None ko.userId = None ko._scopeCache = {} ko.db = _FakeDb() return ko def test_purge_by_connection_removes_only_matching_rows(): ko = _buildKnowledge() ko.db.addIndex({"id": "sp1", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"}) ko.db.addIndex({"id": "sp2", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"}) ko.db.addIndex({"id": "upload", "connectionId": None, "mandateId": "m1", "sourceKind": "file"}) ko.db.addIndex({"id": "other", "connectionId": "cy", "mandateId": "m1", "sourceKind": "outlook_message"}) ko.db.addChunk({"id": "c1", "fileId": "sp1"}) ko.db.addChunk({"id": "c2", "fileId": "sp1"}) ko.db.addChunk({"id": "c3", "fileId": "sp2"}) ko.db.addChunk({"id": "c4", "fileId": "upload"}) ko.db.addChunk({"id": "c5", "fileId": "other"}) result = ko.deleteFileContentIndexByConnectionId("cx") assert result == {"indexRows": 2, "chunks": 3} assert "sp1" not in ko.db.indexRows assert "sp2" not in ko.db.indexRows assert "upload" in ko.db.indexRows assert "other" in ko.db.indexRows assert set(ko.db.chunks.keys()) == {"c4", "c5"} def test_purge_with_empty_connection_id_is_a_noop(): ko = _buildKnowledge() ko.db.addIndex({"id": "sp1", "connectionId": "cx"}) ko.db.addChunk({"id": "c1", "fileId": "sp1"}) result = ko.deleteFileContentIndexByConnectionId("") assert result == {"indexRows": 0, "chunks": 0} assert "sp1" in ko.db.indexRows def test_purge_unknown_connection_returns_zero(): ko = _buildKnowledge() ko.db.addIndex({"id": "sp1", "connectionId": "cx"}) result = ko.deleteFileContentIndexByConnectionId("nope") assert result == {"indexRows": 0, "chunks": 0} assert "sp1" in ko.db.indexRows if __name__ == "__main__": test_purge_by_connection_removes_only_matching_rows() test_purge_with_empty_connection_id_is_a_noop() test_purge_unknown_connection_returns_zero() print("OK — connection-purge tests passed")