From 6a5ff1ff7cc9d3aeaf4a014209150bb40cfc7537 Mon Sep 17 00:00:00 2001 From: Ida Date: Tue, 21 Apr 2026 13:44:21 +0200 Subject: [PATCH] feat(rag): P1 user-connection hooks + retrieval threshold fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - connection.established/revoked callbacks from OAuth routes and connection management endpoints - KnowledgeIngestionConsumer dispatches bootstrap job (established) and synchronous purge (revoked) - FileContentIndex: add connectionId + sourceKind columns - SharePoint bootstrap with @odata.nextLink pagination and eTag-based idempotency - Outlook bootstrap treats messages as virtual documents with cleanEmailBody for HTML/quote/signature stripping - fix(rag): lower buildAgentContext minScore thresholds from 0.55/0.65/0.70 to 0.35 — previous values blocked all real matches from text-embedding-3-small - 24 new unit tests covering purge, consumer dispatch, email cleaning and both bootstrap paths --- app.py | 10 + .../connectors/providerMsft/connectorMsft.py | 41 +- modules/datamodels/datamodelKnowledge.py | 10 + modules/interfaces/interfaceDbKnowledge.py | 40 ++ modules/routes/routeDataConnections.py | 38 +- modules/routes/routeSecurityClickup.py | 12 + modules/routes/routeSecurityGoogle.py | 12 + modules/routes/routeSecurityMsft.py | 12 + .../serviceKnowledge/mainServiceKnowledge.py | 112 +++- .../subConnectorIngestConsumer.py | 196 +++++++ .../subConnectorSyncOutlook.py | 551 ++++++++++++++++++ .../subConnectorSyncSharepoint.py | 425 ++++++++++++++ .../services/serviceKnowledge/subTextClean.py | 107 ++++ tests/unit/services/test_bootstrap_outlook.py | 190 ++++++ .../services/test_bootstrap_sharepoint.py | 209 +++++++ tests/unit/services/test_clean_email_body.py | 110 ++++ tests/unit/services/test_connection_purge.py | 119 ++++ .../test_knowledge_ingest_consumer.py | 172 ++++++ 18 files changed, 2323 insertions(+), 43 deletions(-) create mode 100644 modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py create mode 100644 modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py create mode 100644 modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py create mode 100644 modules/serviceCenter/services/serviceKnowledge/subTextClean.py create mode 100644 tests/unit/services/test_bootstrap_outlook.py create mode 100644 tests/unit/services/test_bootstrap_sharepoint.py create mode 100644 tests/unit/services/test_clean_email_body.py create mode 100644 tests/unit/services/test_connection_purge.py create mode 100644 tests/unit/services/test_knowledge_ingest_consumer.py diff --git a/app.py b/app.py index 41271739..52e70982 100644 --- a/app.py +++ b/app.py @@ -405,6 +405,16 @@ async def lifespan(app: FastAPI): except Exception as e: logger.warning(f"BackgroundJob recovery failed (non-critical): {e}") + # Subscribe knowledge ingestion to connection lifecycle events so OAuth + # connect/disconnect reliably trigger bootstrap/purge. + try: + from modules.serviceCenter.services.serviceKnowledge.subConnectorIngestConsumer import ( + registerKnowledgeIngestionConsumer, + ) + registerKnowledgeIngestionConsumer() + except Exception as e: + logger.warning(f"KnowledgeIngestionConsumer registration failed (non-critical): {e}") + yield # --- Stop Managers --- diff --git a/modules/connectors/providerMsft/connectorMsft.py b/modules/connectors/providerMsft/connectorMsft.py index bf290eca..49f6fdaa 100644 --- a/modules/connectors/providerMsft/connectorMsft.py +++ b/modules/connectors/providerMsft/connectorMsft.py @@ -126,6 +126,11 @@ def _stripGraphBase(url: str) -> str: def _graphItemToExternalEntry(item: Dict[str, Any], basePath: str = "") -> ExternalEntry: isFolder = "folder" in item + # Graph exposes the driveItem content hash as ``eTag`` (quoted) or + # ``cTag``; we normalise to a "revision" string so callers can use it as a + # stable ``contentVersion`` for idempotent ingestion without re-downloading + # file bytes. + revision = item.get("eTag") or item.get("cTag") return ExternalEntry( name=item.get("name", ""), path=f"{basePath}/{item.get('name', '')}" if basePath else item.get("name", ""), @@ -137,6 +142,9 @@ def _graphItemToExternalEntry(item: Dict[str, Any], basePath: str = "") -> Exter "id": item.get("id"), "webUrl": item.get("webUrl"), "childCount": item.get("folder", {}).get("childCount") if isFolder else None, + "revision": revision, + "lastModifiedDateTime": item.get("lastModifiedDateTime"), + "parentReference": item.get("parentReference", {}), }, ) @@ -167,21 +175,36 @@ class SharepointAdapter(_GraphApiMixin, ServiceAdapter): return await self._discoverSites() if not folderPath or folderPath == "/": - endpoint = f"sites/{siteId}/drive/root/children" + endpoint: Optional[str] = f"sites/{siteId}/drive/root/children?$top=200" else: cleanPath = folderPath.lstrip("/") - endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children" + endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children?$top=200" - result = await self._graphGet(endpoint) - if "error" in result: - logger.warning(f"SharePoint browse failed: {result['error']}") - return [] + # Follow @odata.nextLink until a hard cap is reached so large libraries + # are fully enumerated (required for bootstrap). Per-page size uses + # Graph's max supported value to minimise round-trips. + effectiveLimit = int(limit) if limit is not None else None + items: List[Dict[str, Any]] = [] + hardCap = 5000 + while endpoint and len(items) < hardCap: + result = await self._graphGet(endpoint) + if "error" in result: + logger.warning(f"SharePoint browse failed: {result['error']}") + break + for raw in result.get("value", []) or []: + items.append(raw) + if effectiveLimit is not None and len(items) >= effectiveLimit: + break + if effectiveLimit is not None and len(items) >= effectiveLimit: + break + nextLink = result.get("@odata.nextLink") + endpoint = _stripGraphBase(nextLink) if nextLink else None - entries = [_graphItemToExternalEntry(item, path) for item in result.get("value", [])] + entries = [_graphItemToExternalEntry(item, path) for item in items] if filter: entries = [e for e in entries if _matchFilter(e, filter)] - if limit is not None: - entries = entries[: max(1, int(limit))] + if effectiveLimit is not None: + entries = entries[: max(1, effectiveLimit)] return entries async def _discoverSites(self) -> List[ExternalEntry]: diff --git a/modules/datamodels/datamodelKnowledge.py b/modules/datamodels/datamodelKnowledge.py index 163328a4..d0af2216 100644 --- a/modules/datamodels/datamodelKnowledge.py +++ b/modules/datamodels/datamodelKnowledge.py @@ -90,6 +90,16 @@ class FileContentIndex(PowerOnModel): description="Data visibility scope: personal, featureInstance, mandate, global", json_schema_extra={"label": "Sichtbarkeit"}, ) + sourceKind: str = Field( + default="file", + description="Origin of the indexed content: file, sharepoint_item, outlook_message, outlook_attachment, ...", + json_schema_extra={"label": "Quellenart"}, + ) + connectionId: Optional[str] = Field( + default=None, + description="UserConnection ID if this index entry originates from an external connector", + json_schema_extra={"label": "Connection-ID"}, + ) neutralizationStatus: Optional[str] = Field( default=None, description="Neutralization status: completed, failed, skipped, None = not required", diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py index f819615e..e5a14147 100644 --- a/modules/interfaces/interfaceDbKnowledge.py +++ b/modules/interfaces/interfaceDbKnowledge.py @@ -93,6 +93,46 @@ class KnowledgeObjects: self.db.recordModify(FileContentIndex, fileId, {"status": status}) return True + def deleteFileContentIndexByConnectionId(self, connectionId: str) -> Dict[str, int]: + """Delete all FileContentIndex rows (and their ContentChunks) for a connection. + + Used when a UserConnection is revoked / disconnected so the knowledge corpus + no longer references data the user no longer grants access to. Returns a dict + with counts to support observability logs. + """ + if not connectionId: + return {"indexRows": 0, "chunks": 0} + + rows = self.db.getRecordset( + FileContentIndex, recordFilter={"connectionId": connectionId} + ) + mandateIds: set = set() + chunkCount = 0 + indexCount = 0 + for row in rows: + fid = row.get("id") if isinstance(row, dict) else getattr(row, "id", None) + mid = row.get("mandateId") if isinstance(row, dict) else getattr(row, "mandateId", "") + if not fid: + continue + chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fid}) + for chunk in chunks: + if self.db.recordDelete(ContentChunk, chunk["id"]): + chunkCount += 1 + if self.db.recordDelete(FileContentIndex, fid): + indexCount += 1 + if mid: + mandateIds.add(str(mid)) + + for mid in mandateIds: + try: + from modules.interfaces.interfaceDbBilling import _getRootInterface + + _getRootInterface().reconcileMandateStorageBilling(mid) + except Exception as ex: + logger.warning("reconcileMandateStorageBilling after connection purge failed: %s", ex) + + return {"indexRows": indexCount, "chunks": chunkCount} + def deleteFileContentIndex(self, fileId: str) -> bool: """Delete a FileContentIndex and all associated ContentChunks.""" existing = self.getFileContentIndex(fileId) diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py index 8e7a730d..b8ccf4bf 100644 --- a/modules/routes/routeDataConnections.py +++ b/modules/routes/routeDataConnections.py @@ -586,8 +586,25 @@ def disconnect_service( detail=routeApiMsg("Connection not found") ) - # Update connection status - connection.status = ConnectionStatus.INACTIVE + # Fire revoked event BEFORE DB status change so knowledge purge and + # status mutation form one logical step; subscribers see the + # connection as it was. INACTIVE does not exist on the enum — REVOKED + # is the correct terminal-but-retained state (deleted rows are + # handled in DELETE /{id}). + try: + from modules.shared.callbackRegistry import callbackRegistry + + callbackRegistry.trigger( + "connection.revoked", + connectionId=connectionId, + authority=str(getattr(connection.authority, "value", connection.authority) or ""), + userId=str(currentUser.id), + reason="disconnected", + ) + except Exception as _cbErr: + logger.warning("connection.revoked callback failed for %s: %s", connectionId, _cbErr) + + connection.status = ConnectionStatus.REVOKED connection.lastChecked = getUtcTimestamp() # Update connection record - models now handle timestamp serialization automatically @@ -636,6 +653,23 @@ def delete_connection( detail=routeApiMsg("Connection not found") ) + # Fire revoked event BEFORE the row disappears so consumers still + # have authority/connection context for observability; purge itself + # targets FileContentIndex rows by connectionId which are unaffected + # by the UserConnection delete. + try: + from modules.shared.callbackRegistry import callbackRegistry + + callbackRegistry.trigger( + "connection.revoked", + connectionId=connectionId, + authority=str(getattr(connection.authority, "value", connection.authority) or ""), + userId=str(currentUser.id), + reason="deleted", + ) + except Exception as _cbErr: + logger.warning("connection.revoked callback failed for %s: %s", connectionId, _cbErr) + # Remove the connection - only need connectionId since permissions are verified interface.removeUserConnection(connectionId) diff --git a/modules/routes/routeSecurityClickup.py b/modules/routes/routeSecurityClickup.py index ca787391..698e3ca1 100644 --- a/modules/routes/routeSecurityClickup.py +++ b/modules/routes/routeSecurityClickup.py @@ -241,6 +241,18 @@ async def auth_connect_callback( ) interface.saveConnectionToken(token) + try: + from modules.shared.callbackRegistry import callbackRegistry + + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "clickup"), + userId=str(user.id), + ) + except Exception as _cbErr: + logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) + return HTMLResponse( content=f""" diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py index 523523ee..2b6a70f5 100644 --- a/modules/routes/routeSecurityGoogle.py +++ b/modules/routes/routeSecurityGoogle.py @@ -479,6 +479,18 @@ async def auth_connect_callback( ) interface.saveConnectionToken(token) + try: + from modules.shared.callbackRegistry import callbackRegistry + + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "google"), + userId=str(user.id), + ) + except Exception as _cbErr: + logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) + return HTMLResponse( content=f""" diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py index cc4cb87b..e087a44c 100644 --- a/modules/routes/routeSecurityMsft.py +++ b/modules/routes/routeSecurityMsft.py @@ -420,6 +420,18 @@ async def auth_connect_callback( ) interface.saveConnectionToken(token) + try: + from modules.shared.callbackRegistry import callbackRegistry + + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "msft"), + userId=str(user.id), + ) + except Exception as _cbErr: + logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) + return HTMLResponse( content=f""" diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 57490eba..0267e2fd 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -203,6 +203,8 @@ class KnowledgeService: contentObjects=job.contentObjects or [], structure=structure, containerPath=job.containerPath, + sourceKind=job.sourceKind, + connectionId=(job.provenance or {}).get("connectionId"), ) except Exception as exc: logger.error( @@ -254,6 +256,31 @@ class KnowledgeService: index=index, ) + def purgeConnection(self, connectionId: str) -> Dict[str, int]: + """Delete every FileContentIndex + ContentChunk linked to a UserConnection. + + Called on `connection.revoked` events so the knowledge corpus never + holds chunks the user has withdrawn access to. Returns deletion counts + for observability. + """ + if not connectionId: + return {"indexRows": 0, "chunks": 0} + startMs = time.time() + result = self._knowledgeDb.deleteFileContentIndexByConnectionId(connectionId) + logger.info( + "ingestion.connection.purged connectionId=%s rows=%d chunks=%d durationMs=%d", + connectionId, result["indexRows"], result["chunks"], + int((time.time() - startMs) * 1000), + extra={ + "event": "ingestion.connection.purged", + "connectionId": connectionId, + "indexRows": result["indexRows"], + "chunks": result["chunks"], + "durationMs": int((time.time() - startMs) * 1000), + }, + ) + return result + def getIngestionStatus( self, handleOrJobId: Union[IngestionHandle, str] ) -> Dict[str, Any]: @@ -362,6 +389,8 @@ class KnowledgeService: contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, + sourceKind: str = "file", + connectionId: Optional[str] = None, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. @@ -384,39 +413,41 @@ class KnowledgeService: """ contentObjects = contentObjects or [] - # 1. Resolve scope fields from FileItem (Single Source of Truth) - # FileItem lives in poweron_management; its scope/mandateId/featureInstanceId - # are authoritative and must be mirrored onto the FileContentIndex. + # 1. Resolve scope fields from FileItem (Single Source of Truth) for + # uploaded files. Connector-sourced ingestion (sharepoint_item, + # outlook_message, ...) has no FileItem row — trust the caller's + # scope + ids directly. resolvedScope = "personal" resolvedMandateId = mandateId resolvedFeatureInstanceId = featureInstanceId resolvedUserId = userId _shouldNeutralize = False - try: - from modules.datamodels.datamodelFiles import FileItem as _FileItem - _dbComponent = getattr(self._context, "interfaceDbComponent", None) - _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else [] - if not _fileRecords: - from modules.interfaces.interfaceDbManagement import ComponentObjects - _row = ComponentObjects().db._loadRecord(_FileItem, fileId) - if _row: - _fileRecords = [_row] - if _fileRecords: - _fileRecord = _fileRecords[0] - _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) - _shouldNeutralize = bool(_get("neutralize", False)) - _fileScope = _get("scope") - if _fileScope: - resolvedScope = _fileScope - if not resolvedMandateId: - resolvedMandateId = str(_get("mandateId", "") or "") - if not resolvedFeatureInstanceId: - resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "") - _fileCreatedBy = _get("sysCreatedBy") - if _fileCreatedBy: - resolvedUserId = str(_fileCreatedBy) - except Exception: - pass + if sourceKind == "file": + try: + from modules.datamodels.datamodelFiles import FileItem as _FileItem + _dbComponent = getattr(self._context, "interfaceDbComponent", None) + _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else [] + if not _fileRecords: + from modules.interfaces.interfaceDbManagement import ComponentObjects + _row = ComponentObjects().db._loadRecord(_FileItem, fileId) + if _row: + _fileRecords = [_row] + if _fileRecords: + _fileRecord = _fileRecords[0] + _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) + _shouldNeutralize = bool(_get("neutralize", False)) + _fileScope = _get("scope") + if _fileScope: + resolvedScope = _fileScope + if not resolvedMandateId: + resolvedMandateId = str(_get("mandateId", "") or "") + if not resolvedFeatureInstanceId: + resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "") + _fileCreatedBy = _get("sysCreatedBy") + if _fileCreatedBy: + resolvedUserId = str(_fileCreatedBy) + except Exception: + pass # 2. Create FileContentIndex with correct scope from the start index = FileContentIndex( @@ -425,6 +456,8 @@ class KnowledgeService: featureInstanceId=resolvedFeatureInstanceId, mandateId=resolvedMandateId, scope=resolvedScope, + sourceKind=sourceKind, + connectionId=connectionId, fileName=fileName, mimeType=mimeType, containerPath=containerPath, @@ -601,7 +634,12 @@ class KnowledgeService: Formatted context string for injection into the agent's system prompt. """ queryVector = await self._embedSingle(currentPrompt) + logger.debug( + "buildAgentContext.start userId=%s featureInstanceId=%s mandateId=%s isSysAdmin=%s prompt=%r", + userId, featureInstanceId, mandateId, isSysAdmin, (currentPrompt or "")[:120], + ) if not queryVector: + logger.debug("buildAgentContext.abort reason=no_query_vector") return "" builder = _ContextBuilder(budget=contextBudget) @@ -628,9 +666,14 @@ class KnowledgeService: featureInstanceId=featureInstanceId, mandateId=mandateId, limit=15, - minScore=0.65, + minScore=0.35, isSysAdmin=isSysAdmin, ) + logger.debug( + "buildAgentContext.layer1 instanceChunks=%d top_scores=%s", + len(instanceChunks), + [round(float(c.get("_score", 0) or 0), 3) for c in (instanceChunks or [])[:3]], + ) if instanceChunks: builder.add(priority=1, label="Relevant Documents", items=instanceChunks, maxChars=4000) @@ -639,7 +682,7 @@ class KnowledgeService: queryVector=queryVector, workflowId=workflowId, limit=10, - minScore=0.55, + minScore=0.35, ) if roundMemories: memItems = [] @@ -677,7 +720,7 @@ class KnowledgeService: scope="mandate", mandateId=mandateId, limit=10, - minScore=0.7, + minScore=0.35, isSysAdmin=isSysAdmin, ) if mandateChunks: @@ -693,7 +736,12 @@ class KnowledgeService: maxChars=500, ) - return builder.build() + _result = builder.build() + logger.debug( + "buildAgentContext.done totalChars=%d userId=%s", + len(_result), userId, + ) + return _result # ========================================================================= # Workflow Memory diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py new file mode 100644 index 00000000..51acb71c --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -0,0 +1,196 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Connection-lifecycle consumer bridging OAuth events to ingestion jobs. + +Subscribes to `connection.established` and `connection.revoked` callbacks +emitted by the OAuth callbacks / connection management routes and dispatches: + +- `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob + that walks the connector and ingests all reachable items via + KnowledgeService.requestIngestion (file-like or virtual documents). +- `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously + so the knowledge corpus releases the data before the UI confirms the revoke. + +The consumer is registered once at process boot (see `app.py` lifespan). +It intentionally does NOT hold a per-user service context; each callback +creates whatever context it needs from the UserConnection row itself. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Dict, Optional + +from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface +from modules.shared.callbackRegistry import callbackRegistry +from modules.serviceCenter.services.serviceBackgroundJobs import ( + registerJobHandler, + startJob, +) + +logger = logging.getLogger(__name__) + +BOOTSTRAP_JOB_TYPE = "connection.bootstrap" + +_registered = False + + +def _onConnectionEstablished( + *, + connectionId: str, + authority: str, + userId: Optional[str] = None, + **kwargs: Any, +) -> None: + """Fire-and-forget bootstrap enqueue for a freshly connected UserConnection.""" + if not connectionId: + logger.warning("connection.established without connectionId; ignoring") + return + payload: Dict[str, Any] = { + "connectionId": connectionId, + "authority": (authority or "").lower(), + "userId": userId, + } + logger.info( + "ingestion.connection.bootstrap.queued connectionId=%s authority=%s", + connectionId, authority, + extra={ + "event": "ingestion.connection.bootstrap.queued", + "connectionId": connectionId, + "authority": authority, + }, + ) + + async def _enqueue() -> None: + try: + await startJob( + BOOTSTRAP_JOB_TYPE, + payload, + triggeredBy=userId, + ) + except Exception as exc: + logger.error( + "ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s", + connectionId, exc, exc_info=True, + ) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(_enqueue()) + else: + loop.run_until_complete(_enqueue()) + except RuntimeError: + asyncio.run(_enqueue()) + + +def _onConnectionRevoked( + *, + connectionId: str, + authority: Optional[str] = None, + userId: Optional[str] = None, + reason: Optional[str] = None, + **kwargs: Any, +) -> None: + """Run the knowledge purge synchronously so UI feedback is authoritative.""" + if not connectionId: + logger.warning("connection.revoked without connectionId; ignoring") + return + try: + # Purge lives on the DB interface to avoid ServiceCenter/user-context + # plumbing here; the service method is a thin wrapper on top of this. + result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId) + except Exception as exc: + logger.error( + "ingestion.connection.purged.failed connectionId=%s error=%s", + connectionId, exc, exc_info=True, + ) + return + logger.info( + "ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d", + connectionId, authority, reason, + result.get("indexRows", 0), result.get("chunks", 0), + extra={ + "event": "ingestion.connection.purged", + "connectionId": connectionId, + "authority": authority, + "reason": reason, + "indexRows": result.get("indexRows", 0), + "chunks": result.get("chunks", 0), + }, + ) + + +async def _bootstrapJobHandler( + job: Dict[str, Any], + progressCb, +) -> Dict[str, Any]: + """Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps.""" + payload = job.get("payload") or {} + connectionId = payload.get("connectionId") + authority = (payload.get("authority") or "").lower() + if not connectionId: + raise ValueError("connection.bootstrap requires payload.connectionId") + + progressCb(5, f"resolving {authority} connection") + + if authority == "msft": + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import ( + bootstrapSharepoint, + ) + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import ( + bootstrapOutlook, + ) + + progressCb(10, "sharepoint + outlook") + spResult, olResult = await asyncio.gather( + bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb), + bootstrapOutlook(connectionId=connectionId, progressCb=progressCb), + return_exceptions=True, + ) + + def _normalize(res: Any, label: str) -> Dict[str, Any]: + if isinstance(res, Exception): + logger.error( + "ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s", + label, connectionId, res, exc_info=res, + ) + return {"error": str(res)} + return res or {} + + return { + "connectionId": connectionId, + "authority": authority, + "sharepoint": _normalize(spResult, "sharepoint"), + "outlook": _normalize(olResult, "outlook"), + } + + logger.info( + "ingestion.connection.bootstrap.skipped reason=P1_pilot_scope authority=%s connectionId=%s", + authority, connectionId, + extra={ + "event": "ingestion.connection.bootstrap.skipped", + "authority": authority, + "connectionId": connectionId, + "reason": "P1_pilot_scope", + }, + ) + return { + "connectionId": connectionId, + "authority": authority, + "skipped": True, + "reason": "P1_pilot_scope", + } + + +def registerKnowledgeIngestionConsumer() -> None: + """Register callback subscribers + background job handler. Idempotent.""" + global _registered + if _registered: + return + callbackRegistry.register("connection.established", _onConnectionEstablished) + callbackRegistry.register("connection.revoked", _onConnectionRevoked) + registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler) + _registered = True + logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE) diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py new file mode 100644 index 00000000..b3f425ac --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py @@ -0,0 +1,551 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Outlook bootstrap for the unified knowledge ingestion lane. + +Unlike SharePoint, Outlook messages are "virtual documents" — we never persist +file bytes in the store. Each message becomes a `sourceKind="outlook_message"` +IngestionJob whose `contentObjects` carry the header, snippet and cleaned body +so retrieval can show a compact answer without fetching Graph again. + +Attachments are optional (`includeAttachments` limit flag) and enqueued as +child jobs with `sourceKind="outlook_attachment"` + `provenance.parentId`. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from modules.serviceCenter.services.serviceKnowledge.subTextClean import cleanEmailBody + +logger = logging.getLogger(__name__) + +MAX_MESSAGES_DEFAULT = 500 +MAX_FOLDERS_DEFAULT = 5 +MAX_BODY_CHARS_DEFAULT = 8000 +MAX_ATTACHMENT_BYTES_DEFAULT = 10 * 1024 * 1024 +WELL_KNOWN_FOLDERS = ("inbox", "sentitems") + + +@dataclass +class OutlookBootstrapLimits: + maxMessages: int = MAX_MESSAGES_DEFAULT + maxFolders: int = MAX_FOLDERS_DEFAULT + maxBodyChars: int = MAX_BODY_CHARS_DEFAULT + includeAttachments: bool = False + maxAttachmentBytes: int = MAX_ATTACHMENT_BYTES_DEFAULT + # Only fetch messages newer than N days. None disables filter. + maxAgeDays: Optional[int] = 90 + + +@dataclass +class OutlookBootstrapResult: + connectionId: str + indexed: int = 0 + skippedDuplicate: int = 0 + skippedPolicy: int = 0 + failed: int = 0 + attachmentsIndexed: int = 0 + errors: List[str] = field(default_factory=list) + + +def _syntheticMessageId(connectionId: str, messageId: str) -> str: + token = hashlib.sha256(f"{connectionId}:{messageId}".encode("utf-8")).hexdigest()[:16] + return f"om:{connectionId[:8]}:{token}" + + +def _syntheticAttachmentId(connectionId: str, messageId: str, attachmentId: str) -> str: + token = hashlib.sha256( + f"{connectionId}:{messageId}:{attachmentId}".encode("utf-8") + ).hexdigest()[:16] + return f"oa:{connectionId[:8]}:{token}" + + +def _extractRecipient(recipient: Dict[str, Any]) -> str: + email = (recipient or {}).get("emailAddress") or {} + name = email.get("name") or "" + addr = email.get("address") or "" + if name and addr: + return f"{name} <{addr}>" + return addr or name + + +def _joinRecipients(recipients: List[Dict[str, Any]]) -> str: + return ", ".join(filter(None, [_extractRecipient(r) for r in recipients or []])) + + +def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dict[str, Any]]: + subject = message.get("subject") or "(no subject)" + fromAddr = _extractRecipient(message.get("from") or {}) + toAddr = _joinRecipients(message.get("toRecipients") or []) + ccAddr = _joinRecipients(message.get("ccRecipients") or []) + received = message.get("receivedDateTime") or "" + snippet = message.get("bodyPreview") or "" + + body = message.get("body") or {} + bodyContent = body.get("content") or "" + bodyType = (body.get("contentType") or "").lower() + if bodyType == "html" or (bodyContent and "<" in bodyContent and ">" in bodyContent): + cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) + else: + cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) if bodyContent else "" + + parts: List[Dict[str, Any]] = [] + header = ( + f"Subject: {subject}\n" + f"From: {fromAddr}\n" + f"To: {toAddr}\n" + + (f"Cc: {ccAddr}\n" if ccAddr else "") + + f"Date: {received}" + ) + parts.append({ + "contentObjectId": "header", + "contentType": "text", + "data": header, + "contextRef": {"part": "header"}, + }) + if snippet: + parts.append({ + "contentObjectId": "snippet", + "contentType": "text", + "data": snippet, + "contextRef": {"part": "snippet"}, + }) + if cleanedBody: + parts.append({ + "contentObjectId": "body", + "contentType": "text", + "data": cleanedBody, + "contextRef": {"part": "body"}, + }) + return parts + + +async def bootstrapOutlook( + connectionId: str, + *, + progressCb: Optional[Callable[[int, Optional[str]], None]] = None, + adapter: Any = None, + connection: Any = None, + knowledgeService: Any = None, + limits: Optional[OutlookBootstrapLimits] = None, +) -> Dict[str, Any]: + """Enumerate Outlook folders (inbox + sent by default) and ingest messages.""" + limits = limits or OutlookBootstrapLimits() + startMs = time.time() + result = OutlookBootstrapResult(connectionId=connectionId) + + logger.info( + "ingestion.connection.bootstrap.started part=outlook connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.started", + "part": "outlook", + "connectionId": connectionId, + }, + ) + + if adapter is None or knowledgeService is None or connection is None: + adapter, connection, knowledgeService = await _resolveDependencies(connectionId) + + mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else "" + userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" + + folderIds = await _selectFolderIds(adapter, limits) + for folderId in folderIds: + if result.indexed + result.skippedDuplicate >= limits.maxMessages: + break + try: + await _ingestFolder( + adapter=adapter, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + folderId=folderId, + limits=limits, + result=result, + progressCb=progressCb, + ) + except Exception as exc: + logger.error("outlook ingestion folder %s failed: %s", folderId, exc, exc_info=True) + result.errors.append(f"folder({folderId}): {exc}") + + return _finalizeResult(connectionId, result, startMs) + + +async def _resolveDependencies(connectionId: str): + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.auth import TokenManager + from modules.connectors.providerMsft.connectorMsft import MsftConnector + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.security.rootAccess import getRootUser + + rootInterface = getRootInterface() + connection = rootInterface.getUserConnectionById(connectionId) + if connection is None: + raise ValueError(f"UserConnection not found: {connectionId}") + + token = TokenManager().getFreshToken(connectionId) + if not token or not token.tokenAccess: + raise ValueError(f"No valid token for connection {connectionId}") + + provider = MsftConnector(connection, token.tokenAccess) + adapter = provider.getServiceAdapter("outlook") + + rootUser = getRootUser() + ctx = ServiceCenterContext( + user=rootUser, + mandate_id=str(getattr(connection, "mandateId", "") or ""), + ) + knowledgeService = getService("knowledge", ctx) + return adapter, connection, knowledgeService + + +async def _selectFolderIds(adapter, limits: OutlookBootstrapLimits) -> List[str]: + """Prefer well-known folders (inbox, sentitems); fall back to browse().""" + folderIds: List[str] = [] + for wellKnown in WELL_KNOWN_FOLDERS: + if len(folderIds) >= limits.maxFolders: + break + try: + row = await adapter._graphGet(f"me/mailFolders/{wellKnown}") + except Exception: + row = None + if isinstance(row, dict) and "error" not in row and row.get("id"): + folderIds.append(row["id"]) + + if len(folderIds) < limits.maxFolders: + try: + entries = await adapter.browse("/") + except Exception: + entries = [] + for entry in entries: + metadata = getattr(entry, "metadata", {}) or {} + fid = metadata.get("id") + if fid and fid not in folderIds: + folderIds.append(fid) + if len(folderIds) >= limits.maxFolders: + break + return folderIds + + +async def _ingestFolder( + *, + adapter, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + folderId: str, + limits: OutlookBootstrapLimits, + result: OutlookBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + remaining = limits.maxMessages - (result.indexed + result.skippedDuplicate) + if remaining <= 0: + return + + pageSize = min(100, remaining) + select = ( + "id,subject,from,toRecipients,ccRecipients,receivedDateTime," + "bodyPreview,body,internetMessageId,hasAttachments,changeKey" + ) + endpoint: Optional[str] = ( + f"me/mailFolders/{folderId}/messages" + f"?$top={pageSize}&$orderby=receivedDateTime desc&$select={select}" + ) + + # Keep header-based age filter in Graph itself to avoid shipping ancient + # messages we'd discard client-side. + if limits.maxAgeDays: + from datetime import datetime, timezone, timedelta + + cutoff = datetime.now(timezone.utc) - timedelta(days=limits.maxAgeDays) + cutoffIso = cutoff.strftime("%Y-%m-%dT%H:%M:%SZ") + endpoint = f"{endpoint}&$filter=receivedDateTime ge {cutoffIso}" + + while endpoint and (result.indexed + result.skippedDuplicate) < limits.maxMessages: + try: + page = await adapter._graphGet(endpoint) + except Exception as exc: + logger.warning("outlook graph page failed for folder %s: %s", folderId, exc) + result.errors.append(f"graph({folderId}): {exc}") + return + if not isinstance(page, dict) or "error" in page: + err = (page or {}).get("error") if isinstance(page, dict) else "unknown" + logger.warning("outlook graph page error for folder %s: %s", folderId, err) + result.errors.append(f"graph({folderId}): {err}") + return + + for message in page.get("value", []) or []: + if result.indexed + result.skippedDuplicate >= limits.maxMessages: + break + await _ingestMessage( + adapter=adapter, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + message=message, + limits=limits, + result=result, + progressCb=progressCb, + ) + + nextLink = page.get("@odata.nextLink") + if not nextLink: + break + # Strip Graph base so adapter._graphGet accepts the relative path. + from modules.connectors.providerMsft.connectorMsft import _stripGraphBase + + endpoint = _stripGraphBase(nextLink) + + +async def _ingestMessage( + *, + adapter, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + message: Dict[str, Any], + limits: OutlookBootstrapLimits, + result: OutlookBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + messageId = message.get("id") + if not messageId: + result.skippedPolicy += 1 + return + revision = message.get("changeKey") or message.get("internetMessageId") + subject = message.get("subject") or "(no subject)" + syntheticId = _syntheticMessageId(connectionId, messageId) + fileName = f"{subject[:80].strip()}.eml" if subject else f"{messageId}.eml" + + contentObjects = _buildContentObjects(message, limits.maxBodyChars) + # Always at least the header is emitted, so `contentObjects` is non-empty. + try: + handle = await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="outlook_message", + sourceId=syntheticId, + fileName=fileName, + mimeType="message/rfc822", + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + contentVersion=revision, + provenance={ + "connectionId": connectionId, + "authority": "msft", + "service": "outlook", + "externalItemId": messageId, + "internetMessageId": message.get("internetMessageId"), + "tier": "body", + }, + ) + ) + except Exception as exc: + logger.error("outlook ingestion %s failed: %s", messageId, exc, exc_info=True) + result.failed += 1 + result.errors.append(f"ingest({messageId}): {exc}") + return + + if handle.status == "duplicate": + result.skippedDuplicate += 1 + elif handle.status == "indexed": + result.indexed += 1 + else: + result.failed += 1 + + if limits.includeAttachments and message.get("hasAttachments"): + try: + await _ingestAttachments( + adapter=adapter, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + messageId=messageId, + parentSyntheticId=syntheticId, + limits=limits, + result=result, + ) + except Exception as exc: + logger.warning("outlook attachments %s failed: %s", messageId, exc) + result.errors.append(f"attachments({messageId}): {exc}") + + if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0: + processed = result.indexed + result.skippedDuplicate + try: + progressCb( + min(90, 10 + int(80 * processed / max(1, limits.maxMessages))), + f"outlook processed={processed}", + ) + except Exception: + pass + logger.info( + "ingestion.connection.bootstrap.progress part=outlook processed=%d skippedDup=%d failed=%d", + processed, result.skippedDuplicate, result.failed, + extra={ + "event": "ingestion.connection.bootstrap.progress", + "part": "outlook", + "connectionId": connectionId, + "processed": processed, + "skippedDup": result.skippedDuplicate, + "failed": result.failed, + }, + ) + + await asyncio.sleep(0) + + +async def _ingestAttachments( + *, + adapter, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + messageId: str, + parentSyntheticId: str, + limits: OutlookBootstrapLimits, + result: OutlookBootstrapResult, +) -> None: + """Child ingestion jobs for file attachments (skip inline & oversized).""" + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + from modules.datamodels.datamodelExtraction import ExtractionOptions + from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction + from modules.serviceCenter.services.serviceExtraction.subRegistry import ( + ExtractorRegistry, ChunkerRegistry, + ) + import base64 + + page = await adapter._graphGet(f"me/messages/{messageId}/attachments") + if not isinstance(page, dict) or "error" in page: + return + + extractorRegistry = ExtractorRegistry() + chunkerRegistry = ChunkerRegistry() + + for attachment in page.get("value", []) or []: + if attachment.get("@odata.type") != "#microsoft.graph.fileAttachment": + continue + if attachment.get("isInline"): + continue + size = int(attachment.get("size") or 0) + if size and size > limits.maxAttachmentBytes: + result.skippedPolicy += 1 + continue + contentBytesB64 = attachment.get("contentBytes") + if not contentBytesB64: + continue + try: + rawBytes = base64.b64decode(contentBytesB64) + except Exception: + result.skippedPolicy += 1 + continue + fileName = attachment.get("name") or "attachment" + mimeType = attachment.get("contentType") or "application/octet-stream" + attachmentId = attachment.get("id") or fileName + syntheticId = _syntheticAttachmentId(connectionId, messageId, attachmentId) + + try: + extracted = runExtraction( + extractorRegistry, chunkerRegistry, + rawBytes, fileName, mimeType, + ExtractionOptions(mergeStrategy=None), + ) + except Exception as exc: + logger.warning("outlook attachment extract %s failed: %s", attachmentId, exc) + result.failed += 1 + continue + + contentObjects: List[Dict[str, Any]] = [] + for part in getattr(extracted, "parts", None) or []: + data = getattr(part, "data", None) or "" + if not data or not str(data).strip(): + continue + typeGroup = getattr(part, "typeGroup", "text") or "text" + contentType = "text" + if typeGroup == "image": + contentType = "image" + elif typeGroup in ("binary", "container"): + contentType = "other" + contentObjects.append({ + "contentObjectId": getattr(part, "id", ""), + "contentType": contentType, + "data": data, + "contextRef": { + "containerPath": fileName, + "location": getattr(part, "label", None) or "attachment", + **(getattr(part, "metadata", None) or {}), + }, + }) + if not contentObjects: + result.skippedPolicy += 1 + continue + + try: + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="outlook_attachment", + sourceId=syntheticId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + provenance={ + "connectionId": connectionId, + "authority": "msft", + "service": "outlook", + "parentId": parentSyntheticId, + "externalItemId": attachmentId, + "parentMessageId": messageId, + }, + ) + ) + result.attachmentsIndexed += 1 + except Exception as exc: + logger.warning("outlook attachment ingest %s failed: %s", attachmentId, exc) + result.failed += 1 + + +def _finalizeResult(connectionId: str, result: OutlookBootstrapResult, startMs: float) -> Dict[str, Any]: + durationMs = int((time.time() - startMs) * 1000) + logger.info( + "ingestion.connection.bootstrap.done part=outlook connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d attachments=%d failed=%d durationMs=%d", + connectionId, + result.indexed, result.skippedDuplicate, result.skippedPolicy, + result.attachmentsIndexed, result.failed, durationMs, + extra={ + "event": "ingestion.connection.bootstrap.done", + "part": "outlook", + "connectionId": connectionId, + "indexed": result.indexed, + "skippedDup": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "attachmentsIndexed": result.attachmentsIndexed, + "failed": result.failed, + "durationMs": durationMs, + }, + ) + return { + "connectionId": result.connectionId, + "indexed": result.indexed, + "skippedDuplicate": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "attachmentsIndexed": result.attachmentsIndexed, + "failed": result.failed, + "durationMs": durationMs, + "errors": result.errors[:20], + } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py new file mode 100644 index 00000000..0bceecac --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py @@ -0,0 +1,425 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""SharePoint bootstrap for the unified knowledge ingestion lane. + +Walks the SharePoint drive(s) reachable via a UserConnection, downloads each +file-like item, runs the standard content extraction pipeline and hands the +result to `KnowledgeService.requestIngestion`. Idempotency is provided by the +ingestion façade itself; repeat bootstraps therefore produce +`ingestion.skipped.duplicate` for every unchanged item because we pass the +Graph `eTag` as `contentVersion`. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from modules.datamodels.datamodelExtraction import ExtractionOptions + +logger = logging.getLogger(__name__) + +MAX_ITEMS_DEFAULT = 500 +MAX_BYTES_DEFAULT = 200 * 1024 * 1024 +MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024 +SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/") +MAX_DEPTH_DEFAULT = 4 +MAX_SITES_DEFAULT = 3 + + +@dataclass +class SharepointBootstrapLimits: + maxItems: int = MAX_ITEMS_DEFAULT + maxBytes: int = MAX_BYTES_DEFAULT + maxFileSize: int = MAX_FILE_SIZE_DEFAULT + skipMimePrefixes: tuple = SKIP_MIME_PREFIXES_DEFAULT + maxDepth: int = MAX_DEPTH_DEFAULT + maxSites: int = MAX_SITES_DEFAULT + + +@dataclass +class SharepointBootstrapResult: + connectionId: str + indexed: int = 0 + skippedDuplicate: int = 0 + skippedPolicy: int = 0 + failed: int = 0 + bytesProcessed: int = 0 + errors: List[str] = field(default_factory=list) + + +def _syntheticFileId(connectionId: str, externalItemId: str) -> str: + """Deterministic synthetic FileContentIndex id for a SharePoint item. + + Stable across bootstraps → idempotency works; independent of file name so + moves/renames don't duplicate chunks. + """ + token = hashlib.sha256(f"{connectionId}:{externalItemId}".encode("utf-8")).hexdigest()[:16] + return f"sp:{connectionId[:8]}:{token}" + + +def _toContentObjects(extracted, fileName: str) -> List[Dict[str, Any]]: + """Translate ExtractionResult → content objects accepted by requestIngestion.""" + parts = getattr(extracted, "parts", None) or [] + out: List[Dict[str, Any]] = [] + for part in parts: + data = getattr(part, "data", None) or "" + if not data or not str(data).strip(): + continue + typeGroup = getattr(part, "typeGroup", "text") or "text" + contentType = "text" + if typeGroup == "image": + contentType = "image" + elif typeGroup in ("binary", "container"): + contentType = "other" + out.append({ + "contentObjectId": getattr(part, "id", ""), + "contentType": contentType, + "data": data, + "contextRef": { + "containerPath": fileName, + "location": getattr(part, "label", None) or "file", + **(getattr(part, "metadata", None) or {}), + }, + }) + return out + + +async def bootstrapSharepoint( + connectionId: str, + *, + progressCb: Optional[Callable[[int, Optional[str]], None]] = None, + adapter: Any = None, + connection: Any = None, + knowledgeService: Any = None, + limits: Optional[SharepointBootstrapLimits] = None, + runExtractionFn: Optional[Callable[..., Any]] = None, +) -> Dict[str, Any]: + """Enumerate SharePoint drives and ingest every reachable file via the façade. + + Parameters allow injection for tests; production callers pass only + `connectionId` (and optionally a progressCb) and everything else is + resolved against the registered services. + """ + limits = limits or SharepointBootstrapLimits() + startMs = time.time() + result = SharepointBootstrapResult(connectionId=connectionId) + + logger.info( + "ingestion.connection.bootstrap.started part=sharepoint connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.started", + "part": "sharepoint", + "connectionId": connectionId, + }, + ) + + if adapter is None or knowledgeService is None or connection is None: + adapter, connection, knowledgeService = await _resolveDependencies(connectionId) + if runExtractionFn is None: + from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction + from modules.serviceCenter.services.serviceExtraction.subRegistry import ( + ExtractorRegistry, ChunkerRegistry, + ) + extractorRegistry = ExtractorRegistry() + chunkerRegistry = ChunkerRegistry() + + def runExtractionFn(bytesData, name, mime, options): # type: ignore[no-redef] + return runExtraction(extractorRegistry, chunkerRegistry, bytesData, name, mime, options) + + mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else "" + userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" + + try: + sites = await adapter.browse("/", limit=limits.maxSites) + except Exception as exc: + logger.error("sharepoint site discovery failed for %s: %s", connectionId, exc, exc_info=True) + result.errors.append(f"site_discovery: {exc}") + return _finalizeResult(connectionId, result, startMs) + + for site in sites[: limits.maxSites]: + if result.indexed + result.skippedDuplicate >= limits.maxItems: + break + sitePath = getattr(site, "path", "") or "" + try: + await _walkFolder( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + folderPath=sitePath, + depth=0, + limits=limits, + result=result, + progressCb=progressCb, + ) + except Exception as exc: + logger.error("sharepoint walk failed for site %s: %s", sitePath, exc, exc_info=True) + result.errors.append(f"walk({sitePath}): {exc}") + + return _finalizeResult(connectionId, result, startMs) + + +async def _resolveDependencies(connectionId: str): + """Load connection, instantiate SharepointAdapter, and build a KnowledgeService. + + Runs with root privileges: bootstrap is a system operation triggered by an + authenticated user via callback; it must not be gated by a per-user + service-center context. + """ + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.auth import TokenManager + from modules.connectors.providerMsft.connectorMsft import MsftConnector + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.security.rootAccess import getRootUser + + rootInterface = getRootInterface() + connection = rootInterface.getUserConnectionById(connectionId) + if connection is None: + raise ValueError(f"UserConnection not found: {connectionId}") + + token = TokenManager().getFreshToken(connectionId) + if not token or not token.tokenAccess: + raise ValueError(f"No valid token for connection {connectionId}") + + provider = MsftConnector(connection, token.tokenAccess) + adapter = provider.getServiceAdapter("sharepoint") + + rootUser = getRootUser() + ctx = ServiceCenterContext( + user=rootUser, + mandate_id=str(getattr(connection, "mandateId", "") or ""), + ) + knowledgeService = getService("knowledge", ctx) + return adapter, connection, knowledgeService + + +async def _walkFolder( + *, + adapter, + knowledgeService, + runExtractionFn, + connectionId: str, + mandateId: str, + userId: str, + folderPath: str, + depth: int, + limits: SharepointBootstrapLimits, + result: SharepointBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + if depth > limits.maxDepth: + return + try: + entries = await adapter.browse(folderPath) + except Exception as exc: + logger.warning("sharepoint browse %s failed: %s", folderPath, exc) + result.errors.append(f"browse({folderPath}): {exc}") + return + + for entry in entries: + if result.indexed + result.skippedDuplicate >= limits.maxItems: + return + if result.bytesProcessed >= limits.maxBytes: + return + + entryPath = getattr(entry, "path", "") or "" + if getattr(entry, "isFolder", False): + await _walkFolder( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + folderPath=entryPath, + depth=depth + 1, + limits=limits, + result=result, + progressCb=progressCb, + ) + continue + + mimeType = getattr(entry, "mimeType", None) or "application/octet-stream" + if any(mimeType.startswith(prefix) for prefix in limits.skipMimePrefixes): + result.skippedPolicy += 1 + continue + size = int(getattr(entry, "size", 0) or 0) + if size and size > limits.maxFileSize: + result.skippedPolicy += 1 + continue + + metadata = getattr(entry, "metadata", {}) or {} + externalItemId = metadata.get("id") or entryPath + revision = metadata.get("revision") or metadata.get("lastModifiedDateTime") + + await _ingestOne( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + entry=entry, + entryPath=entryPath, + mimeType=mimeType, + externalItemId=externalItemId, + revision=revision, + limits=limits, + result=result, + progressCb=progressCb, + ) + + +async def _ingestOne( + *, + adapter, + knowledgeService, + runExtractionFn, + connectionId: str, + mandateId: str, + userId: str, + entry, + entryPath: str, + mimeType: str, + externalItemId: str, + revision: Optional[str], + limits: SharepointBootstrapLimits, + result: SharepointBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + syntheticFileId = _syntheticFileId(connectionId, externalItemId) + fileName = getattr(entry, "name", "") or externalItemId + + try: + fileBytes = await adapter.download(entryPath) + except Exception as exc: + logger.warning("sharepoint download %s failed: %s", entryPath, exc) + result.failed += 1 + result.errors.append(f"download({entryPath}): {exc}") + return + if not fileBytes: + result.failed += 1 + return + + result.bytesProcessed += len(fileBytes) + + try: + extracted = runExtractionFn( + fileBytes, fileName, mimeType, + ExtractionOptions(mergeStrategy=None), + ) + except Exception as exc: + logger.warning("sharepoint extraction %s failed: %s", entryPath, exc) + result.failed += 1 + result.errors.append(f"extract({entryPath}): {exc}") + return + + contentObjects = _toContentObjects(extracted, fileName) + if not contentObjects: + result.skippedPolicy += 1 + return + + provenance: Dict[str, Any] = { + "connectionId": connectionId, + "authority": "msft", + "service": "sharepoint", + "externalItemId": externalItemId, + "externalPath": entryPath, + "revision": revision, + } + try: + handle = await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="sharepoint_item", + sourceId=syntheticFileId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + contentVersion=revision, + provenance=provenance, + ) + ) + except Exception as exc: + logger.error("sharepoint ingestion %s failed: %s", entryPath, exc, exc_info=True) + result.failed += 1 + result.errors.append(f"ingest({entryPath}): {exc}") + return + + if handle.status == "duplicate": + result.skippedDuplicate += 1 + elif handle.status == "indexed": + result.indexed += 1 + else: + result.failed += 1 + if handle.error: + result.errors.append(f"ingest({entryPath}): {handle.error}") + + if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0: + processed = result.indexed + result.skippedDuplicate + try: + progressCb( + min(90, 10 + int(80 * processed / max(1, limits.maxItems))), + f"sharepoint processed={processed}", + ) + except Exception: + pass + logger.info( + "ingestion.connection.bootstrap.progress part=sharepoint processed=%d skippedDup=%d failed=%d", + processed, result.skippedDuplicate, result.failed, + extra={ + "event": "ingestion.connection.bootstrap.progress", + "part": "sharepoint", + "connectionId": connectionId, + "processed": processed, + "skippedDup": result.skippedDuplicate, + "failed": result.failed, + }, + ) + + # Yield so the event loop can interleave other tasks (download/extract are + # CPU-ish and extraction uses sync libs; cooperative scheduling prevents + # starving other workers). + await asyncio.sleep(0) + + +def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startMs: float) -> Dict[str, Any]: + durationMs = int((time.time() - startMs) * 1000) + logger.info( + "ingestion.connection.bootstrap.done part=sharepoint connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d", + connectionId, + result.indexed, result.skippedDuplicate, result.skippedPolicy, result.failed, + durationMs, + extra={ + "event": "ingestion.connection.bootstrap.done", + "part": "sharepoint", + "connectionId": connectionId, + "indexed": result.indexed, + "skippedDup": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "durationMs": durationMs, + }, + ) + return { + "connectionId": result.connectionId, + "indexed": result.indexed, + "skippedDuplicate": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "bytesProcessed": result.bytesProcessed, + "durationMs": durationMs, + "errors": result.errors[:20], + } diff --git a/modules/serviceCenter/services/serviceKnowledge/subTextClean.py b/modules/serviceCenter/services/serviceKnowledge/subTextClean.py new file mode 100644 index 00000000..2d352cfa --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subTextClean.py @@ -0,0 +1,107 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Text normalisation utilities used by knowledge ingestion. + +The email body cleaning logic is intentionally regex-based and works on plain +text after an HTML→text pass so we never store unsanitised HTML/JS in the +knowledge store and retrieval stays robust (no extraneous markup tokens +eating embedding budget). +""" + +from __future__ import annotations + +import re +from typing import Optional + +DEFAULT_MAX_CHARS = 8000 + + +_QUOTE_MARKER_PATTERNS = [ + re.compile(r"^\s*(?:On\s.+?\swrote:)\s*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*(?:Am\s.+?\sschrieb.+?:)\s*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*-{2,}\s*Original\s*Message\s*-{2,}\s*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*-{2,}\s*Urspr.+Nachricht\s*-{2,}\s*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*From:\s+.+$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Von:\s+.+$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Sent:\s+.+$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Gesendet:\s+.+$", re.MULTILINE | re.IGNORECASE), +] + +_SIGNATURE_MARKERS = [ + re.compile(r"^\s*-{2,}\s*$", re.MULTILINE), + re.compile(r"^\s*—\s*$", re.MULTILINE), + re.compile(r"^\s*Best regards\b.*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Kind regards\b.*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Mit freundlichen Gr[üu]ßen\b.*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Viele Gr[üu]ße\b.*$", re.MULTILINE | re.IGNORECASE), + re.compile(r"^\s*Best,\s*$", re.MULTILINE | re.IGNORECASE), +] + + +def _htmlToText(html: str) -> str: + """Prefer BeautifulSoup when available, fall back to regex.""" + try: + from bs4 import BeautifulSoup # type: ignore + + soup = BeautifulSoup(html, "html.parser") + for tag in soup(["script", "style", "head"]): + tag.decompose() + for br in soup.find_all(["br"]): + br.replace_with("\n") + for p in soup.find_all(["p", "div", "li", "tr"]): + p.append("\n") + text = soup.get_text() + except Exception: + # Minimal fallback: strip tags crudely. + text = re.sub(r"", "\n", html, flags=re.IGNORECASE) + text = re.sub(r"", "\n", text, flags=re.IGNORECASE) + text = re.sub(r"<[^>]+>", "", text) + # Collapse non-breaking + zero-width whitespace. + text = text.replace("\u00a0", " ").replace("\u200b", "") + return text + + +def _stripQuotedThread(text: str) -> str: + """Remove reply-chain content so only the author's own contribution remains.""" + earliest = len(text) + for pattern in _QUOTE_MARKER_PATTERNS: + match = pattern.search(text) + if match and match.start() < earliest: + earliest = match.start() + # Drop any block starting with "> " quoted lines (often Gmail/Thunderbird). + quotedBlock = re.search(r"^(?:\s*>.*\n?)+", text, re.MULTILINE) + if quotedBlock and quotedBlock.start() < earliest: + earliest = quotedBlock.start() + return text[:earliest].rstrip() + + +def _stripSignature(text: str) -> str: + earliest = len(text) + for pattern in _SIGNATURE_MARKERS: + match = pattern.search(text) + if match and match.start() < earliest: + earliest = match.start() + return text[:earliest].rstrip() + + +def _collapseWhitespace(text: str) -> str: + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +def cleanEmailBody(html: str, maxChars: Optional[int] = DEFAULT_MAX_CHARS) -> str: + """Return a compact plain-text view of an email body suitable for embedding. + + Steps: HTML → text, remove quoted reply chain, remove signature, collapse + whitespace, truncate to maxChars. Always returns a string (possibly empty). + """ + if not html: + return "" + text = _htmlToText(html) if "<" in html and ">" in html else html + text = _stripQuotedThread(text) + text = _stripSignature(text) + text = _collapseWhitespace(text) + if maxChars and len(text) > maxChars: + text = text[:maxChars].rstrip() + "…" + return text diff --git a/tests/unit/services/test_bootstrap_outlook.py b/tests/unit/services/test_bootstrap_outlook.py new file mode 100644 index 00000000..26664eaa --- /dev/null +++ b/tests/unit/services/test_bootstrap_outlook.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Bootstrap Outlook tests with a fake adapter + knowledge service. + +Verifies: +- Well-known folders (inbox, sentitems) are discovered via Graph. +- Each message produces a `requestIngestion` call with sourceKind=outlook_message + and structured contentObjects (header / snippet / body). +- Pagination via `@odata.nextLink` is followed. +- changeKey is forwarded as contentVersion → idempotency. +""" + +import asyncio +import os +import sys +from types import SimpleNamespace + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import ( + bootstrapOutlook, + OutlookBootstrapLimits, + _syntheticMessageId, + _buildContentObjects, +) + + +class _FakeOutlookAdapter: + def __init__(self, messages_by_folder, paginated_folder=None, page2=None): + self._folders = {"inbox": "INBOX-ID", "sentitems": "SENT-ID"} + self._messages = messages_by_folder + self._paginated_folder = paginated_folder + self._page2 = page2 or [] + self.requested_endpoints = [] + + async def _graphGet(self, endpoint: str): + self.requested_endpoints.append(endpoint) + if endpoint.startswith("me/mailFolders/") and "/messages" not in endpoint: + wellKnown = endpoint.split("/")[-1] + fid = self._folders.get(wellKnown) + if not fid: + return {"error": "not found"} + return {"id": fid, "displayName": wellKnown} + # message page request: e.g. me/mailFolders/INBOX-ID/messages?... + for fid, messages in self._messages.items(): + if f"me/mailFolders/{fid}/messages" in endpoint: + page = {"value": messages} + if fid == self._paginated_folder and "skiptoken" not in endpoint: + page["@odata.nextLink"] = ( + "https://graph.microsoft.com/v1.0/" + f"me/mailFolders/{fid}/messages?$skiptoken=abc" + ) + elif fid == self._paginated_folder and "skiptoken" in endpoint: + page = {"value": self._page2} + return page + return {"value": []} + + async def browse(self, path): + return [] + + +class _FakeKnowledgeService: + def __init__(self, duplicateIds=None): + self.calls = [] + self._duplicates = duplicateIds or set() + + async def requestIngestion(self, job): + self.calls.append(job) + status = "duplicate" if job.sourceId in self._duplicates else "indexed" + return SimpleNamespace( + jobId=job.sourceId, status=status, contentHash="h", + fileId=job.sourceId, index=None, error=None, + ) + + +def _msg(mid: str, subject: str = "Hi", change: str = "ck1"): + return { + "id": mid, + "subject": subject, + "from": {"emailAddress": {"name": "Alice", "address": "a@x.com"}}, + "toRecipients": [{"emailAddress": {"name": "Bob", "address": "b@x.com"}}], + "ccRecipients": [], + "receivedDateTime": "2025-01-01T10:00:00Z", + "bodyPreview": "Hello world", + "body": {"contentType": "text", "content": "Hello world\nThis is the body."}, + "internetMessageId": f"<{mid}@local>", + "hasAttachments": False, + "changeKey": change, + } + + +def test_buildContentObjects_emits_header_snippet_body(): + parts = _buildContentObjects(_msg("m1"), maxBodyChars=8000) + ids = [p["contentObjectId"] for p in parts] + assert ids == ["header", "snippet", "body"] + header = parts[0]["data"] + assert "Subject: Hi" in header + assert "From: Alice " in header + assert "To: Bob " in header + + +def test_bootstrap_outlook_indexes_messages_from_inbox_and_sent(): + adapter = _FakeOutlookAdapter({ + "INBOX-ID": [_msg("m1"), _msg("m2")], + "SENT-ID": [_msg("m3")], + }) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapOutlook( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + limits=OutlookBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 3 + sourceIds = {c.sourceId for c in knowledge.calls} + assert sourceIds == { + _syntheticMessageId("c1", "m1"), + _syntheticMessageId("c1", "m2"), + _syntheticMessageId("c1", "m3"), + } + for job in knowledge.calls: + assert job.sourceKind == "outlook_message" + assert job.mimeType == "message/rfc822" + assert job.provenance["connectionId"] == "c1" + assert job.provenance["service"] == "outlook" + assert job.contentVersion == "ck1" + assert any(co["contentObjectId"] == "header" for co in job.contentObjects) + + +def test_bootstrap_outlook_follows_pagination(): + adapter = _FakeOutlookAdapter( + messages_by_folder={"INBOX-ID": [_msg("m1")], "SENT-ID": []}, + paginated_folder="INBOX-ID", + page2=[_msg("m2"), _msg("m3")], + ) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapOutlook( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + limits=OutlookBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 3 + + +def test_bootstrap_outlook_reports_duplicates(): + adapter = _FakeOutlookAdapter({ + "INBOX-ID": [_msg("m1"), _msg("m2")], + "SENT-ID": [], + }) + duplicates = { + _syntheticMessageId("c1", "m1"), + _syntheticMessageId("c1", "m2"), + } + knowledge = _FakeKnowledgeService(duplicateIds=duplicates) + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapOutlook( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + limits=OutlookBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 0 + assert result["skippedDuplicate"] == 2 + + +if __name__ == "__main__": + test_buildContentObjects_emits_header_snippet_body() + test_bootstrap_outlook_indexes_messages_from_inbox_and_sent() + test_bootstrap_outlook_follows_pagination() + test_bootstrap_outlook_reports_duplicates() + print("OK — bootstrapOutlook tests passed") diff --git a/tests/unit/services/test_bootstrap_sharepoint.py b/tests/unit/services/test_bootstrap_sharepoint.py new file mode 100644 index 00000000..8b011357 --- /dev/null +++ b/tests/unit/services/test_bootstrap_sharepoint.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Bootstrap SharePoint tests with a fake adapter + knowledge service. + +Verifies: +- Every discovered file triggers `requestIngestion`. +- Duplicate runs (same eTag revisions) report `skippedDuplicate`. +- Synthetic fileIds are stable across runs so idempotency works end-to-end. +""" + +import asyncio +import os +import sys +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Any, Dict, List, Optional + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import ( + bootstrapSharepoint, + _syntheticFileId, +) + + +@dataclass +class _ExtEntry: + name: str + path: str + isFolder: bool = False + size: Optional[int] = None + mimeType: Optional[str] = None + metadata: Dict[str, Any] = None + + +class _FakeSpAdapter: + """Minimal SharepointAdapter stand-in. + + Layout: + "/" → 1 site + "/sites/site-1" → 2 files (f1, f2) + 1 folder (sub) + "/sites/site-1/sub" → 1 file (f3) + """ + + def __init__(self): + self.downloaded: List[str] = [] + + async def browse(self, path: str, filter=None, limit=None): + if path == "/": + return [ + _ExtEntry( + name="Site 1", + path="/sites/site-1", + isFolder=True, + metadata={"id": "site-1"}, + ), + ] + if path == "/sites/site-1": + return [ + _ExtEntry( + name="f1.txt", path="/sites/site-1/f1.txt", + mimeType="text/plain", size=20, + metadata={"id": "f1", "revision": "etag-f1"}, + ), + _ExtEntry( + name="f2.txt", path="/sites/site-1/f2.txt", + mimeType="text/plain", size=20, + metadata={"id": "f2", "revision": "etag-f2"}, + ), + _ExtEntry( + name="sub", path="/sites/site-1/sub", + isFolder=True, metadata={"id": "sub"}, + ), + ] + if path == "/sites/site-1/sub": + return [ + _ExtEntry( + name="f3.txt", path="/sites/site-1/sub/f3.txt", + mimeType="text/plain", size=20, + metadata={"id": "f3", "revision": "etag-f3"}, + ), + ] + return [] + + async def download(self, path: str) -> bytes: + self.downloaded.append(path) + return path.encode("utf-8") + + +class _FakeKnowledgeService: + """Records requestIngestion calls and returns the scripted handles.""" + + def __init__(self, duplicateIds=None): + self.calls: List[SimpleNamespace] = [] + self._duplicateIds = duplicateIds or set() + + async def requestIngestion(self, job): + self.calls.append(job) + status = "duplicate" if job.sourceId in self._duplicateIds else "indexed" + return SimpleNamespace( + jobId=f"{job.sourceKind}:{job.sourceId}", + status=status, + contentHash="h", + fileId=job.sourceId, + index=None, + error=None, + ) + + +def _fakeRunExtraction(data, name, mime, options): + """Produce a single synthetic text part so `_toContentObjects` returns one.""" + return SimpleNamespace( + parts=[ + SimpleNamespace( + id="p1", + data=data.decode("utf-8") if isinstance(data, bytes) else str(data), + typeGroup="text", + label="page:1", + metadata={"pageIndex": 0}, + ) + ] + ) + + +def test_bootstrap_walks_sites_and_subfolders(): + adapter = _FakeSpAdapter() + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapSharepoint( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + ) + + result = asyncio.run(_run()) + assert len(knowledge.calls) == 3 + sourceIds = {c.sourceId for c in knowledge.calls} + assert sourceIds == { + _syntheticFileId("c1", "f1"), + _syntheticFileId("c1", "f2"), + _syntheticFileId("c1", "f3"), + } + assert result["indexed"] == 3 + assert result["skippedDuplicate"] == 0 + assert adapter.downloaded == [ + "/sites/site-1/f1.txt", + "/sites/site-1/f2.txt", + "/sites/site-1/sub/f3.txt", + ] + + +def test_bootstrap_reports_duplicates_on_second_run(): + adapter = _FakeSpAdapter() + duplicateIds = { + _syntheticFileId("c1", "f1"), + _syntheticFileId("c1", "f2"), + _syntheticFileId("c1", "f3"), + } + knowledge = _FakeKnowledgeService(duplicateIds=duplicateIds) + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapSharepoint( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 0 + assert result["skippedDuplicate"] == 3 + + +def test_bootstrap_passes_connection_provenance(): + adapter = _FakeSpAdapter() + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapSharepoint( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + ) + + asyncio.run(_run()) + for job in knowledge.calls: + assert job.sourceKind == "sharepoint_item" + assert job.mandateId == "m1" + assert job.provenance["connectionId"] == "c1" + assert job.provenance["authority"] == "msft" + assert job.provenance["service"] == "sharepoint" + assert job.contentVersion and job.contentVersion.startswith("etag-") + + +if __name__ == "__main__": + test_bootstrap_walks_sites_and_subfolders() + test_bootstrap_reports_duplicates_on_second_run() + test_bootstrap_passes_connection_provenance() + print("OK — bootstrapSharepoint tests passed") diff --git a/tests/unit/services/test_clean_email_body.py b/tests/unit/services/test_clean_email_body.py new file mode 100644 index 00000000..a3ee01df --- /dev/null +++ b/tests/unit/services/test_clean_email_body.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Unit tests for cleanEmailBody. + +Covers: HTML→text normalisation, quoted-reply removal, signature removal, +whitespace collapse and truncation. The utility is used during Outlook +bootstrap; buggy cleaning would leak quoted threads / signatures into every +embedding. +""" + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subTextClean import ( + cleanEmailBody, +) + + +def test_strips_html_tags_and_scripts(): + html = ( + "" + "

Hello world

" + "" + ) + cleaned = cleanEmailBody(html) + assert "Hello" in cleaned + assert "world" in cleaned + assert "<" not in cleaned + assert "alert" not in cleaned + + +def test_strips_quoted_reply_english(): + body = ( + "Actual answer from me.\n\n" + "On Mon, 1 Jan 2024 at 10:00, Someone wrote:\n" + "> Original question?\n" + "> Second line.\n" + ) + cleaned = cleanEmailBody(body) + assert "Actual answer" in cleaned + assert "Original question" not in cleaned + assert "wrote:" not in cleaned + + +def test_strips_quoted_reply_german(): + body = ( + "Meine Antwort.\n\n" + "Am 1. Januar 2024 um 10:00 schrieb Max Muster :\n" + "> Ursprüngliche Frage?\n" + ) + cleaned = cleanEmailBody(body) + assert "Meine Antwort" in cleaned + assert "Ursprüngliche Frage" not in cleaned + + +def test_strips_signature_after_dashes(): + body = ( + "Kurze Nachricht.\n" + "\n" + "--\n" + "Max Muster\n" + "Vorstand, Beispiel GmbH\n" + ) + cleaned = cleanEmailBody(body) + assert "Kurze Nachricht" in cleaned + assert "Beispiel GmbH" not in cleaned + + +def test_strips_signature_salutation_de(): + body = ( + "Die eigentliche Information steht hier.\n\n" + "Mit freundlichen Grüßen\n" + "Max Muster" + ) + cleaned = cleanEmailBody(body) + assert "eigentliche Information" in cleaned + assert "Max Muster" not in cleaned + + +def test_truncate_to_max_chars(): + body = "abc " * 5000 + cleaned = cleanEmailBody(body, maxChars=200) + assert len(cleaned) <= 201 # includes trailing ellipsis + + +def test_empty_input_returns_empty_string(): + assert cleanEmailBody("") == "" + assert cleanEmailBody(None) == "" # type: ignore[arg-type] + + +def test_collapses_whitespace(): + body = "A lot of spaces\n\n\n\nand blank lines" + cleaned = cleanEmailBody(body) + assert " " not in cleaned + assert "\n\n\n" not in cleaned + + +if __name__ == "__main__": + test_strips_html_tags_and_scripts() + test_strips_quoted_reply_english() + test_strips_quoted_reply_german() + test_strips_signature_after_dashes() + test_strips_signature_salutation_de() + test_truncate_to_max_chars() + test_empty_input_returns_empty_string() + test_collapses_whitespace() + print("OK — cleanEmailBody tests passed") diff --git a/tests/unit/services/test_connection_purge.py b/tests/unit/services/test_connection_purge.py new file mode 100644 index 00000000..c32cb5b3 --- /dev/null +++ b/tests/unit/services/test_connection_purge.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Purge tests for KnowledgeObjects.deleteFileContentIndexByConnectionId. + +Ensures that a `connection.revoked` event wipes every FileContentIndex + chunk +linked to the given connectionId while leaving entries from other connections +(or upload-files with connectionId=None) intact. +""" + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk +from modules.interfaces.interfaceDbKnowledge import KnowledgeObjects + + +class _FakeDb: + """Minimal in-memory stand-in for ``KnowledgeObjects.db``. + + Supports just the subset of APIs that deleteFileContentIndexByConnectionId + touches: getRecordset(FileContentIndex|ContentChunk, ...) + recordDelete. + """ + + def __init__(self): + self.indexRows: dict = {} + self.chunks: dict = {} + + def addIndex(self, row: dict) -> None: + self.indexRows[row["id"]] = row + + def addChunk(self, row: dict) -> None: + self.chunks[row["id"]] = row + + def getRecordset(self, modelClass, recordFilter=None, **_): + filter_ = recordFilter or {} + if modelClass is FileContentIndex: + rows = list(self.indexRows.values()) + elif modelClass is ContentChunk: + rows = list(self.chunks.values()) + else: + return [] + + def match(row): + for k, v in filter_.items(): + if row.get(k) != v: + return False + return True + + return [r for r in rows if match(r)] + + def recordDelete(self, modelClass, recordId): + if modelClass is FileContentIndex: + return self.indexRows.pop(recordId, None) is not None + if modelClass is ContentChunk: + return self.chunks.pop(recordId, None) is not None + return False + + +def _buildKnowledge(): + """Instantiate KnowledgeObjects without triggering the real DB bootstrap.""" + ko = KnowledgeObjects.__new__(KnowledgeObjects) + ko.currentUser = None + ko.userId = None + ko._scopeCache = {} + ko.db = _FakeDb() + return ko + + +def test_purge_by_connection_removes_only_matching_rows(): + ko = _buildKnowledge() + ko.db.addIndex({"id": "sp1", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"}) + ko.db.addIndex({"id": "sp2", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"}) + ko.db.addIndex({"id": "upload", "connectionId": None, "mandateId": "m1", "sourceKind": "file"}) + ko.db.addIndex({"id": "other", "connectionId": "cy", "mandateId": "m1", "sourceKind": "outlook_message"}) + ko.db.addChunk({"id": "c1", "fileId": "sp1"}) + ko.db.addChunk({"id": "c2", "fileId": "sp1"}) + ko.db.addChunk({"id": "c3", "fileId": "sp2"}) + ko.db.addChunk({"id": "c4", "fileId": "upload"}) + ko.db.addChunk({"id": "c5", "fileId": "other"}) + + result = ko.deleteFileContentIndexByConnectionId("cx") + + assert result == {"indexRows": 2, "chunks": 3} + assert "sp1" not in ko.db.indexRows + assert "sp2" not in ko.db.indexRows + assert "upload" in ko.db.indexRows + assert "other" in ko.db.indexRows + assert set(ko.db.chunks.keys()) == {"c4", "c5"} + + +def test_purge_with_empty_connection_id_is_a_noop(): + ko = _buildKnowledge() + ko.db.addIndex({"id": "sp1", "connectionId": "cx"}) + ko.db.addChunk({"id": "c1", "fileId": "sp1"}) + + result = ko.deleteFileContentIndexByConnectionId("") + + assert result == {"indexRows": 0, "chunks": 0} + assert "sp1" in ko.db.indexRows + + +def test_purge_unknown_connection_returns_zero(): + ko = _buildKnowledge() + ko.db.addIndex({"id": "sp1", "connectionId": "cx"}) + + result = ko.deleteFileContentIndexByConnectionId("nope") + + assert result == {"indexRows": 0, "chunks": 0} + assert "sp1" in ko.db.indexRows + + +if __name__ == "__main__": + test_purge_by_connection_removes_only_matching_rows() + test_purge_with_empty_connection_id_is_a_noop() + test_purge_unknown_connection_returns_zero() + print("OK — connection-purge tests passed") diff --git a/tests/unit/services/test_knowledge_ingest_consumer.py b/tests/unit/services/test_knowledge_ingest_consumer.py new file mode 100644 index 00000000..760e1ed6 --- /dev/null +++ b/tests/unit/services/test_knowledge_ingest_consumer.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Unit tests for KnowledgeIngestionConsumer event dispatch. + +- `connection.established` → enqueue a `connection.bootstrap` job. +- `connection.revoked` → synchronous purge via KnowledgeObjects. +""" + +import asyncio +import os +import sys +import types + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge import subConnectorIngestConsumer as consumer + + +def _resetRegistration(monkeypatch): + """Force the module-level guard to register fresh in each test.""" + monkeypatch.setattr(consumer, "_registered", False) + + +def test_onConnectionEstablished_enqueues_bootstrap(monkeypatch): + startedJobs = [] + + async def _fakeStartJob(jobType, payload, **kwargs): + startedJobs.append({"jobType": jobType, "payload": payload, "kwargs": kwargs}) + return "job-1" + + monkeypatch.setattr(consumer, "startJob", _fakeStartJob) + consumer._onConnectionEstablished( + connectionId="c1", authority="msft", userId="u1" + ) + # Drain pending tasks created by the consumer. + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + # If the consumer created a Task on a closed loop the fake startJob + # was still called synchronously via asyncio.run — in either case we + # check the recorded call. + finally: + loop.close() + + assert len(startedJobs) == 1 + assert startedJobs[0]["jobType"] == consumer.BOOTSTRAP_JOB_TYPE + assert startedJobs[0]["payload"]["connectionId"] == "c1" + assert startedJobs[0]["payload"]["authority"] == "msft" + assert startedJobs[0]["kwargs"]["triggeredBy"] == "u1" + + +def test_onConnectionEstablished_ignores_missing_id(monkeypatch): + called = [] + + async def _fakeStartJob(*a, **kw): + called.append(1) + return "x" + + monkeypatch.setattr(consumer, "startJob", _fakeStartJob) + consumer._onConnectionEstablished(connectionId="", authority="msft") + assert called == [] + + +def test_onConnectionRevoked_runs_sync_purge(monkeypatch): + class _FakeKnowledge: + def __init__(self): + self.calls = [] + + def deleteFileContentIndexByConnectionId(self, cid): + self.calls.append(cid) + return {"indexRows": 2, "chunks": 5} + + fakeKnow = _FakeKnowledge() + + def _fakeGetInterface(_user=None): + return fakeKnow + + monkeypatch.setattr(consumer, "getKnowledgeInterface", _fakeGetInterface) + consumer._onConnectionRevoked( + connectionId="c1", authority="msft", userId="u1", reason="disconnected" + ) + assert fakeKnow.calls == ["c1"] + + +def test_onConnectionRevoked_ignores_missing_id(monkeypatch): + seen = [] + + def _fakeGetInterface(_user=None): + class _K: + def deleteFileContentIndexByConnectionId(self, cid): + seen.append(cid) + return {"indexRows": 0, "chunks": 0} + + return _K() + + monkeypatch.setattr(consumer, "getKnowledgeInterface", _fakeGetInterface) + consumer._onConnectionRevoked(connectionId="") + assert seen == [] + + +def test_bootstrap_job_skips_non_pilot_authority(monkeypatch): + async def _run(): + result = await consumer._bootstrapJobHandler( + {"payload": {"connectionId": "c1", "authority": "google"}}, + lambda *_: None, + ) + return result + + result = asyncio.run(_run()) + assert result["skipped"] is True + assert result["authority"] == "google" + + +def test_bootstrap_job_dispatches_msft_parts(monkeypatch): + calls = {"sp": 0, "ol": 0} + + async def _fakeSp(connectionId, progressCb=None): + calls["sp"] += 1 + return {"indexed": 1} + + async def _fakeOl(connectionId, progressCb=None): + calls["ol"] += 1 + return {"indexed": 2} + + # subConnectorSync* are lazy-imported inside the handler; install fake + # modules before invoking. + fakeSharepoint = types.ModuleType("subConnectorSyncSharepoint") + fakeSharepoint.bootstrapSharepoint = _fakeSp + fakeOutlook = types.ModuleType("subConnectorSyncOutlook") + fakeOutlook.bootstrapOutlook = _fakeOl + monkeypatch.setitem( + sys.modules, + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint", + fakeSharepoint, + ) + monkeypatch.setitem( + sys.modules, + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook", + fakeOutlook, + ) + + async def _run(): + return await consumer._bootstrapJobHandler( + {"payload": {"connectionId": "c1", "authority": "msft"}}, + lambda *_: None, + ) + + result = asyncio.run(_run()) + assert calls == {"sp": 1, "ol": 1} + assert result["sharepoint"] == {"indexed": 1} + assert result["outlook"] == {"indexed": 2} + + +if __name__ == "__main__": + # Usable without pytest fixtures for a quick smoke run. + class _MP: + def __init__(self): + self.undos = [] + + def setattr(self, target, name_or_value, value=None): + if value is None: + # target is an object, name_or_value is value → no, original signature + raise SystemExit("use pytest monkeypatch in CLI") + self.undos.append((target, name_or_value, getattr(target, name_or_value))) + setattr(target, name_or_value, value) + + def setitem(self, mapping, key, value): + self.undos.append((mapping, key, mapping.get(key))) + mapping[key] = value + + print("Run via pytest: pytest tests/unit/services/test_knowledge_ingest_consumer.py")