diff --git a/app.py b/app.py
index 41271739..52e70982 100644
--- a/app.py
+++ b/app.py
@@ -405,6 +405,16 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning(f"BackgroundJob recovery failed (non-critical): {e}")
+ # Subscribe knowledge ingestion to connection lifecycle events so OAuth
+ # connect/disconnect reliably trigger bootstrap/purge.
+ try:
+ from modules.serviceCenter.services.serviceKnowledge.subConnectorIngestConsumer import (
+ registerKnowledgeIngestionConsumer,
+ )
+ registerKnowledgeIngestionConsumer()
+ except Exception as e:
+ logger.warning(f"KnowledgeIngestionConsumer registration failed (non-critical): {e}")
+
yield
# --- Stop Managers ---
diff --git a/modules/connectors/providerMsft/connectorMsft.py b/modules/connectors/providerMsft/connectorMsft.py
index bf290eca..49f6fdaa 100644
--- a/modules/connectors/providerMsft/connectorMsft.py
+++ b/modules/connectors/providerMsft/connectorMsft.py
@@ -126,6 +126,11 @@ def _stripGraphBase(url: str) -> str:
def _graphItemToExternalEntry(item: Dict[str, Any], basePath: str = "") -> ExternalEntry:
isFolder = "folder" in item
+ # Graph exposes the driveItem content hash as ``eTag`` (quoted) or
+ # ``cTag``; we normalise to a "revision" string so callers can use it as a
+ # stable ``contentVersion`` for idempotent ingestion without re-downloading
+ # file bytes.
+ revision = item.get("eTag") or item.get("cTag")
return ExternalEntry(
name=item.get("name", ""),
path=f"{basePath}/{item.get('name', '')}" if basePath else item.get("name", ""),
@@ -137,6 +142,9 @@ def _graphItemToExternalEntry(item: Dict[str, Any], basePath: str = "") -> Exter
"id": item.get("id"),
"webUrl": item.get("webUrl"),
"childCount": item.get("folder", {}).get("childCount") if isFolder else None,
+ "revision": revision,
+ "lastModifiedDateTime": item.get("lastModifiedDateTime"),
+ "parentReference": item.get("parentReference", {}),
},
)
@@ -167,21 +175,36 @@ class SharepointAdapter(_GraphApiMixin, ServiceAdapter):
return await self._discoverSites()
if not folderPath or folderPath == "/":
- endpoint = f"sites/{siteId}/drive/root/children"
+ endpoint: Optional[str] = f"sites/{siteId}/drive/root/children?$top=200"
else:
cleanPath = folderPath.lstrip("/")
- endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children"
+ endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children?$top=200"
- result = await self._graphGet(endpoint)
- if "error" in result:
- logger.warning(f"SharePoint browse failed: {result['error']}")
- return []
+ # Follow @odata.nextLink until a hard cap is reached so large libraries
+ # are fully enumerated (required for bootstrap). Per-page size uses
+ # Graph's max supported value to minimise round-trips.
+ effectiveLimit = int(limit) if limit is not None else None
+ items: List[Dict[str, Any]] = []
+ hardCap = 5000
+ while endpoint and len(items) < hardCap:
+ result = await self._graphGet(endpoint)
+ if "error" in result:
+ logger.warning(f"SharePoint browse failed: {result['error']}")
+ break
+ for raw in result.get("value", []) or []:
+ items.append(raw)
+ if effectiveLimit is not None and len(items) >= effectiveLimit:
+ break
+ if effectiveLimit is not None and len(items) >= effectiveLimit:
+ break
+ nextLink = result.get("@odata.nextLink")
+ endpoint = _stripGraphBase(nextLink) if nextLink else None
- entries = [_graphItemToExternalEntry(item, path) for item in result.get("value", [])]
+ entries = [_graphItemToExternalEntry(item, path) for item in items]
if filter:
entries = [e for e in entries if _matchFilter(e, filter)]
- if limit is not None:
- entries = entries[: max(1, int(limit))]
+ if effectiveLimit is not None:
+ entries = entries[: max(1, effectiveLimit)]
return entries
async def _discoverSites(self) -> List[ExternalEntry]:
diff --git a/modules/datamodels/datamodelKnowledge.py b/modules/datamodels/datamodelKnowledge.py
index 163328a4..d0af2216 100644
--- a/modules/datamodels/datamodelKnowledge.py
+++ b/modules/datamodels/datamodelKnowledge.py
@@ -90,6 +90,16 @@ class FileContentIndex(PowerOnModel):
description="Data visibility scope: personal, featureInstance, mandate, global",
json_schema_extra={"label": "Sichtbarkeit"},
)
+ sourceKind: str = Field(
+ default="file",
+ description="Origin of the indexed content: file, sharepoint_item, outlook_message, outlook_attachment, ...",
+ json_schema_extra={"label": "Quellenart"},
+ )
+ connectionId: Optional[str] = Field(
+ default=None,
+ description="UserConnection ID if this index entry originates from an external connector",
+ json_schema_extra={"label": "Connection-ID"},
+ )
neutralizationStatus: Optional[str] = Field(
default=None,
description="Neutralization status: completed, failed, skipped, None = not required",
diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py
index f819615e..e5a14147 100644
--- a/modules/interfaces/interfaceDbKnowledge.py
+++ b/modules/interfaces/interfaceDbKnowledge.py
@@ -93,6 +93,46 @@ class KnowledgeObjects:
self.db.recordModify(FileContentIndex, fileId, {"status": status})
return True
+ def deleteFileContentIndexByConnectionId(self, connectionId: str) -> Dict[str, int]:
+ """Delete all FileContentIndex rows (and their ContentChunks) for a connection.
+
+ Used when a UserConnection is revoked / disconnected so the knowledge corpus
+ no longer references data the user no longer grants access to. Returns a dict
+ with counts to support observability logs.
+ """
+ if not connectionId:
+ return {"indexRows": 0, "chunks": 0}
+
+ rows = self.db.getRecordset(
+ FileContentIndex, recordFilter={"connectionId": connectionId}
+ )
+ mandateIds: set = set()
+ chunkCount = 0
+ indexCount = 0
+ for row in rows:
+ fid = row.get("id") if isinstance(row, dict) else getattr(row, "id", None)
+ mid = row.get("mandateId") if isinstance(row, dict) else getattr(row, "mandateId", "")
+ if not fid:
+ continue
+ chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fid})
+ for chunk in chunks:
+ if self.db.recordDelete(ContentChunk, chunk["id"]):
+ chunkCount += 1
+ if self.db.recordDelete(FileContentIndex, fid):
+ indexCount += 1
+ if mid:
+ mandateIds.add(str(mid))
+
+ for mid in mandateIds:
+ try:
+ from modules.interfaces.interfaceDbBilling import _getRootInterface
+
+ _getRootInterface().reconcileMandateStorageBilling(mid)
+ except Exception as ex:
+ logger.warning("reconcileMandateStorageBilling after connection purge failed: %s", ex)
+
+ return {"indexRows": indexCount, "chunks": chunkCount}
+
def deleteFileContentIndex(self, fileId: str) -> bool:
"""Delete a FileContentIndex and all associated ContentChunks."""
existing = self.getFileContentIndex(fileId)
diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py
index 8e7a730d..b8ccf4bf 100644
--- a/modules/routes/routeDataConnections.py
+++ b/modules/routes/routeDataConnections.py
@@ -586,8 +586,25 @@ def disconnect_service(
detail=routeApiMsg("Connection not found")
)
- # Update connection status
- connection.status = ConnectionStatus.INACTIVE
+ # Fire revoked event BEFORE DB status change so knowledge purge and
+ # status mutation form one logical step; subscribers see the
+ # connection as it was. INACTIVE does not exist on the enum — REVOKED
+ # is the correct terminal-but-retained state (deleted rows are
+ # handled in DELETE /{id}).
+ try:
+ from modules.shared.callbackRegistry import callbackRegistry
+
+ callbackRegistry.trigger(
+ "connection.revoked",
+ connectionId=connectionId,
+ authority=str(getattr(connection.authority, "value", connection.authority) or ""),
+ userId=str(currentUser.id),
+ reason="disconnected",
+ )
+ except Exception as _cbErr:
+ logger.warning("connection.revoked callback failed for %s: %s", connectionId, _cbErr)
+
+ connection.status = ConnectionStatus.REVOKED
connection.lastChecked = getUtcTimestamp()
# Update connection record - models now handle timestamp serialization automatically
@@ -636,6 +653,23 @@ def delete_connection(
detail=routeApiMsg("Connection not found")
)
+ # Fire revoked event BEFORE the row disappears so consumers still
+ # have authority/connection context for observability; purge itself
+ # targets FileContentIndex rows by connectionId which are unaffected
+ # by the UserConnection delete.
+ try:
+ from modules.shared.callbackRegistry import callbackRegistry
+
+ callbackRegistry.trigger(
+ "connection.revoked",
+ connectionId=connectionId,
+ authority=str(getattr(connection.authority, "value", connection.authority) or ""),
+ userId=str(currentUser.id),
+ reason="deleted",
+ )
+ except Exception as _cbErr:
+ logger.warning("connection.revoked callback failed for %s: %s", connectionId, _cbErr)
+
# Remove the connection - only need connectionId since permissions are verified
interface.removeUserConnection(connectionId)
diff --git a/modules/routes/routeSecurityClickup.py b/modules/routes/routeSecurityClickup.py
index ca787391..698e3ca1 100644
--- a/modules/routes/routeSecurityClickup.py
+++ b/modules/routes/routeSecurityClickup.py
@@ -241,6 +241,18 @@ async def auth_connect_callback(
)
interface.saveConnectionToken(token)
+ try:
+ from modules.shared.callbackRegistry import callbackRegistry
+
+ callbackRegistry.trigger(
+ "connection.established",
+ connectionId=connection.id,
+ authority=str(getattr(connection.authority, "value", connection.authority) or "clickup"),
+ userId=str(user.id),
+ )
+ except Exception as _cbErr:
+ logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr)
+
return HTMLResponse(
content=f"""
diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py
index 523523ee..2b6a70f5 100644
--- a/modules/routes/routeSecurityGoogle.py
+++ b/modules/routes/routeSecurityGoogle.py
@@ -479,6 +479,18 @@ async def auth_connect_callback(
)
interface.saveConnectionToken(token)
+ try:
+ from modules.shared.callbackRegistry import callbackRegistry
+
+ callbackRegistry.trigger(
+ "connection.established",
+ connectionId=connection.id,
+ authority=str(getattr(connection.authority, "value", connection.authority) or "google"),
+ userId=str(user.id),
+ )
+ except Exception as _cbErr:
+ logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr)
+
return HTMLResponse(
content=f"""
diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py
index cc4cb87b..e087a44c 100644
--- a/modules/routes/routeSecurityMsft.py
+++ b/modules/routes/routeSecurityMsft.py
@@ -420,6 +420,18 @@ async def auth_connect_callback(
)
interface.saveConnectionToken(token)
+ try:
+ from modules.shared.callbackRegistry import callbackRegistry
+
+ callbackRegistry.trigger(
+ "connection.established",
+ connectionId=connection.id,
+ authority=str(getattr(connection.authority, "value", connection.authority) or "msft"),
+ userId=str(user.id),
+ )
+ except Exception as _cbErr:
+ logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr)
+
return HTMLResponse(
content=f"""
diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
index 57490eba..0267e2fd 100644
--- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
+++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
@@ -203,6 +203,8 @@ class KnowledgeService:
contentObjects=job.contentObjects or [],
structure=structure,
containerPath=job.containerPath,
+ sourceKind=job.sourceKind,
+ connectionId=(job.provenance or {}).get("connectionId"),
)
except Exception as exc:
logger.error(
@@ -254,6 +256,31 @@ class KnowledgeService:
index=index,
)
+ def purgeConnection(self, connectionId: str) -> Dict[str, int]:
+ """Delete every FileContentIndex + ContentChunk linked to a UserConnection.
+
+ Called on `connection.revoked` events so the knowledge corpus never
+ holds chunks the user has withdrawn access to. Returns deletion counts
+ for observability.
+ """
+ if not connectionId:
+ return {"indexRows": 0, "chunks": 0}
+ startMs = time.time()
+ result = self._knowledgeDb.deleteFileContentIndexByConnectionId(connectionId)
+ logger.info(
+ "ingestion.connection.purged connectionId=%s rows=%d chunks=%d durationMs=%d",
+ connectionId, result["indexRows"], result["chunks"],
+ int((time.time() - startMs) * 1000),
+ extra={
+ "event": "ingestion.connection.purged",
+ "connectionId": connectionId,
+ "indexRows": result["indexRows"],
+ "chunks": result["chunks"],
+ "durationMs": int((time.time() - startMs) * 1000),
+ },
+ )
+ return result
+
def getIngestionStatus(
self, handleOrJobId: Union[IngestionHandle, str]
) -> Dict[str, Any]:
@@ -362,6 +389,8 @@ class KnowledgeService:
contentObjects: List[Dict[str, Any]] = None,
structure: Dict[str, Any] = None,
containerPath: str = None,
+ sourceKind: str = "file",
+ connectionId: Optional[str] = None,
) -> FileContentIndex:
"""Index a file's content objects and create embeddings for text chunks.
@@ -384,39 +413,41 @@ class KnowledgeService:
"""
contentObjects = contentObjects or []
- # 1. Resolve scope fields from FileItem (Single Source of Truth)
- # FileItem lives in poweron_management; its scope/mandateId/featureInstanceId
- # are authoritative and must be mirrored onto the FileContentIndex.
+ # 1. Resolve scope fields from FileItem (Single Source of Truth) for
+ # uploaded files. Connector-sourced ingestion (sharepoint_item,
+ # outlook_message, ...) has no FileItem row — trust the caller's
+ # scope + ids directly.
resolvedScope = "personal"
resolvedMandateId = mandateId
resolvedFeatureInstanceId = featureInstanceId
resolvedUserId = userId
_shouldNeutralize = False
- try:
- from modules.datamodels.datamodelFiles import FileItem as _FileItem
- _dbComponent = getattr(self._context, "interfaceDbComponent", None)
- _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else []
- if not _fileRecords:
- from modules.interfaces.interfaceDbManagement import ComponentObjects
- _row = ComponentObjects().db._loadRecord(_FileItem, fileId)
- if _row:
- _fileRecords = [_row]
- if _fileRecords:
- _fileRecord = _fileRecords[0]
- _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d))
- _shouldNeutralize = bool(_get("neutralize", False))
- _fileScope = _get("scope")
- if _fileScope:
- resolvedScope = _fileScope
- if not resolvedMandateId:
- resolvedMandateId = str(_get("mandateId", "") or "")
- if not resolvedFeatureInstanceId:
- resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "")
- _fileCreatedBy = _get("sysCreatedBy")
- if _fileCreatedBy:
- resolvedUserId = str(_fileCreatedBy)
- except Exception:
- pass
+ if sourceKind == "file":
+ try:
+ from modules.datamodels.datamodelFiles import FileItem as _FileItem
+ _dbComponent = getattr(self._context, "interfaceDbComponent", None)
+ _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else []
+ if not _fileRecords:
+ from modules.interfaces.interfaceDbManagement import ComponentObjects
+ _row = ComponentObjects().db._loadRecord(_FileItem, fileId)
+ if _row:
+ _fileRecords = [_row]
+ if _fileRecords:
+ _fileRecord = _fileRecords[0]
+ _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d))
+ _shouldNeutralize = bool(_get("neutralize", False))
+ _fileScope = _get("scope")
+ if _fileScope:
+ resolvedScope = _fileScope
+ if not resolvedMandateId:
+ resolvedMandateId = str(_get("mandateId", "") or "")
+ if not resolvedFeatureInstanceId:
+ resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "")
+ _fileCreatedBy = _get("sysCreatedBy")
+ if _fileCreatedBy:
+ resolvedUserId = str(_fileCreatedBy)
+ except Exception:
+ pass
# 2. Create FileContentIndex with correct scope from the start
index = FileContentIndex(
@@ -425,6 +456,8 @@ class KnowledgeService:
featureInstanceId=resolvedFeatureInstanceId,
mandateId=resolvedMandateId,
scope=resolvedScope,
+ sourceKind=sourceKind,
+ connectionId=connectionId,
fileName=fileName,
mimeType=mimeType,
containerPath=containerPath,
@@ -601,7 +634,12 @@ class KnowledgeService:
Formatted context string for injection into the agent's system prompt.
"""
queryVector = await self._embedSingle(currentPrompt)
+ logger.debug(
+ "buildAgentContext.start userId=%s featureInstanceId=%s mandateId=%s isSysAdmin=%s prompt=%r",
+ userId, featureInstanceId, mandateId, isSysAdmin, (currentPrompt or "")[:120],
+ )
if not queryVector:
+ logger.debug("buildAgentContext.abort reason=no_query_vector")
return ""
builder = _ContextBuilder(budget=contextBudget)
@@ -628,9 +666,14 @@ class KnowledgeService:
featureInstanceId=featureInstanceId,
mandateId=mandateId,
limit=15,
- minScore=0.65,
+ minScore=0.35,
isSysAdmin=isSysAdmin,
)
+ logger.debug(
+ "buildAgentContext.layer1 instanceChunks=%d top_scores=%s",
+ len(instanceChunks),
+ [round(float(c.get("_score", 0) or 0), 3) for c in (instanceChunks or [])[:3]],
+ )
if instanceChunks:
builder.add(priority=1, label="Relevant Documents", items=instanceChunks, maxChars=4000)
@@ -639,7 +682,7 @@ class KnowledgeService:
queryVector=queryVector,
workflowId=workflowId,
limit=10,
- minScore=0.55,
+ minScore=0.35,
)
if roundMemories:
memItems = []
@@ -677,7 +720,7 @@ class KnowledgeService:
scope="mandate",
mandateId=mandateId,
limit=10,
- minScore=0.7,
+ minScore=0.35,
isSysAdmin=isSysAdmin,
)
if mandateChunks:
@@ -693,7 +736,12 @@ class KnowledgeService:
maxChars=500,
)
- return builder.build()
+ _result = builder.build()
+ logger.debug(
+ "buildAgentContext.done totalChars=%d userId=%s",
+ len(_result), userId,
+ )
+ return _result
# =========================================================================
# Workflow Memory
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
new file mode 100644
index 00000000..51acb71c
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Connection-lifecycle consumer bridging OAuth events to ingestion jobs.
+
+Subscribes to `connection.established` and `connection.revoked` callbacks
+emitted by the OAuth callbacks / connection management routes and dispatches:
+
+- `connection.established` -> enqueue a `connection.bootstrap` BackgroundJob
+ that walks the connector and ingests all reachable items via
+ KnowledgeService.requestIngestion (file-like or virtual documents).
+- `connection.revoked` -> run `KnowledgeService.purgeConnection` synchronously
+ so the knowledge corpus releases the data before the UI confirms the revoke.
+
+The consumer is registered once at process boot (see `app.py` lifespan).
+It intentionally does NOT hold a per-user service context; each callback
+creates whatever context it needs from the UserConnection row itself.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from typing import Any, Dict, Optional
+
+from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+from modules.shared.callbackRegistry import callbackRegistry
+from modules.serviceCenter.services.serviceBackgroundJobs import (
+ registerJobHandler,
+ startJob,
+)
+
+logger = logging.getLogger(__name__)
+
+BOOTSTRAP_JOB_TYPE = "connection.bootstrap"
+
+_registered = False
+
+
+def _onConnectionEstablished(
+ *,
+ connectionId: str,
+ authority: str,
+ userId: Optional[str] = None,
+ **kwargs: Any,
+) -> None:
+ """Fire-and-forget bootstrap enqueue for a freshly connected UserConnection."""
+ if not connectionId:
+ logger.warning("connection.established without connectionId; ignoring")
+ return
+ payload: Dict[str, Any] = {
+ "connectionId": connectionId,
+ "authority": (authority or "").lower(),
+ "userId": userId,
+ }
+ logger.info(
+ "ingestion.connection.bootstrap.queued connectionId=%s authority=%s",
+ connectionId, authority,
+ extra={
+ "event": "ingestion.connection.bootstrap.queued",
+ "connectionId": connectionId,
+ "authority": authority,
+ },
+ )
+
+ async def _enqueue() -> None:
+ try:
+ await startJob(
+ BOOTSTRAP_JOB_TYPE,
+ payload,
+ triggeredBy=userId,
+ )
+ except Exception as exc:
+ logger.error(
+ "ingestion.connection.bootstrap.enqueue_failed connectionId=%s error=%s",
+ connectionId, exc, exc_info=True,
+ )
+
+ try:
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ loop.create_task(_enqueue())
+ else:
+ loop.run_until_complete(_enqueue())
+ except RuntimeError:
+ asyncio.run(_enqueue())
+
+
+def _onConnectionRevoked(
+ *,
+ connectionId: str,
+ authority: Optional[str] = None,
+ userId: Optional[str] = None,
+ reason: Optional[str] = None,
+ **kwargs: Any,
+) -> None:
+ """Run the knowledge purge synchronously so UI feedback is authoritative."""
+ if not connectionId:
+ logger.warning("connection.revoked without connectionId; ignoring")
+ return
+ try:
+ # Purge lives on the DB interface to avoid ServiceCenter/user-context
+ # plumbing here; the service method is a thin wrapper on top of this.
+ result = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId)
+ except Exception as exc:
+ logger.error(
+ "ingestion.connection.purged.failed connectionId=%s error=%s",
+ connectionId, exc, exc_info=True,
+ )
+ return
+ logger.info(
+ "ingestion.connection.purged connectionId=%s authority=%s reason=%s rows=%d chunks=%d",
+ connectionId, authority, reason,
+ result.get("indexRows", 0), result.get("chunks", 0),
+ extra={
+ "event": "ingestion.connection.purged",
+ "connectionId": connectionId,
+ "authority": authority,
+ "reason": reason,
+ "indexRows": result.get("indexRows", 0),
+ "chunks": result.get("chunks", 0),
+ },
+ )
+
+
+async def _bootstrapJobHandler(
+ job: Dict[str, Any],
+ progressCb,
+) -> Dict[str, Any]:
+ """Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps."""
+ payload = job.get("payload") or {}
+ connectionId = payload.get("connectionId")
+ authority = (payload.get("authority") or "").lower()
+ if not connectionId:
+ raise ValueError("connection.bootstrap requires payload.connectionId")
+
+ progressCb(5, f"resolving {authority} connection")
+
+ if authority == "msft":
+ from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
+ bootstrapSharepoint,
+ )
+ from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import (
+ bootstrapOutlook,
+ )
+
+ progressCb(10, "sharepoint + outlook")
+ spResult, olResult = await asyncio.gather(
+ bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb),
+ bootstrapOutlook(connectionId=connectionId, progressCb=progressCb),
+ return_exceptions=True,
+ )
+
+ def _normalize(res: Any, label: str) -> Dict[str, Any]:
+ if isinstance(res, Exception):
+ logger.error(
+ "ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s",
+ label, connectionId, res, exc_info=res,
+ )
+ return {"error": str(res)}
+ return res or {}
+
+ return {
+ "connectionId": connectionId,
+ "authority": authority,
+ "sharepoint": _normalize(spResult, "sharepoint"),
+ "outlook": _normalize(olResult, "outlook"),
+ }
+
+ logger.info(
+ "ingestion.connection.bootstrap.skipped reason=P1_pilot_scope authority=%s connectionId=%s",
+ authority, connectionId,
+ extra={
+ "event": "ingestion.connection.bootstrap.skipped",
+ "authority": authority,
+ "connectionId": connectionId,
+ "reason": "P1_pilot_scope",
+ },
+ )
+ return {
+ "connectionId": connectionId,
+ "authority": authority,
+ "skipped": True,
+ "reason": "P1_pilot_scope",
+ }
+
+
+def registerKnowledgeIngestionConsumer() -> None:
+ """Register callback subscribers + background job handler. Idempotent."""
+ global _registered
+ if _registered:
+ return
+ callbackRegistry.register("connection.established", _onConnectionEstablished)
+ callbackRegistry.register("connection.revoked", _onConnectionRevoked)
+ registerJobHandler(BOOTSTRAP_JOB_TYPE, _bootstrapJobHandler)
+ _registered = True
+ logger.info("KnowledgeIngestionConsumer registered (established/revoked + %s handler)", BOOTSTRAP_JOB_TYPE)
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
new file mode 100644
index 00000000..b3f425ac
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
@@ -0,0 +1,551 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Outlook bootstrap for the unified knowledge ingestion lane.
+
+Unlike SharePoint, Outlook messages are "virtual documents" — we never persist
+file bytes in the store. Each message becomes a `sourceKind="outlook_message"`
+IngestionJob whose `contentObjects` carry the header, snippet and cleaned body
+so retrieval can show a compact answer without fetching Graph again.
+
+Attachments are optional (`includeAttachments` limit flag) and enqueued as
+child jobs with `sourceKind="outlook_attachment"` + `provenance.parentId`.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import logging
+import time
+from dataclasses import dataclass, field
+from typing import Any, Callable, Dict, List, Optional
+
+from modules.serviceCenter.services.serviceKnowledge.subTextClean import cleanEmailBody
+
+logger = logging.getLogger(__name__)
+
+MAX_MESSAGES_DEFAULT = 500
+MAX_FOLDERS_DEFAULT = 5
+MAX_BODY_CHARS_DEFAULT = 8000
+MAX_ATTACHMENT_BYTES_DEFAULT = 10 * 1024 * 1024
+WELL_KNOWN_FOLDERS = ("inbox", "sentitems")
+
+
+@dataclass
+class OutlookBootstrapLimits:
+ maxMessages: int = MAX_MESSAGES_DEFAULT
+ maxFolders: int = MAX_FOLDERS_DEFAULT
+ maxBodyChars: int = MAX_BODY_CHARS_DEFAULT
+ includeAttachments: bool = False
+ maxAttachmentBytes: int = MAX_ATTACHMENT_BYTES_DEFAULT
+ # Only fetch messages newer than N days. None disables filter.
+ maxAgeDays: Optional[int] = 90
+
+
+@dataclass
+class OutlookBootstrapResult:
+ connectionId: str
+ indexed: int = 0
+ skippedDuplicate: int = 0
+ skippedPolicy: int = 0
+ failed: int = 0
+ attachmentsIndexed: int = 0
+ errors: List[str] = field(default_factory=list)
+
+
+def _syntheticMessageId(connectionId: str, messageId: str) -> str:
+ token = hashlib.sha256(f"{connectionId}:{messageId}".encode("utf-8")).hexdigest()[:16]
+ return f"om:{connectionId[:8]}:{token}"
+
+
+def _syntheticAttachmentId(connectionId: str, messageId: str, attachmentId: str) -> str:
+ token = hashlib.sha256(
+ f"{connectionId}:{messageId}:{attachmentId}".encode("utf-8")
+ ).hexdigest()[:16]
+ return f"oa:{connectionId[:8]}:{token}"
+
+
+def _extractRecipient(recipient: Dict[str, Any]) -> str:
+ email = (recipient or {}).get("emailAddress") or {}
+ name = email.get("name") or ""
+ addr = email.get("address") or ""
+ if name and addr:
+ return f"{name} <{addr}>"
+ return addr or name
+
+
+def _joinRecipients(recipients: List[Dict[str, Any]]) -> str:
+ return ", ".join(filter(None, [_extractRecipient(r) for r in recipients or []]))
+
+
+def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dict[str, Any]]:
+ subject = message.get("subject") or "(no subject)"
+ fromAddr = _extractRecipient(message.get("from") or {})
+ toAddr = _joinRecipients(message.get("toRecipients") or [])
+ ccAddr = _joinRecipients(message.get("ccRecipients") or [])
+ received = message.get("receivedDateTime") or ""
+ snippet = message.get("bodyPreview") or ""
+
+ body = message.get("body") or {}
+ bodyContent = body.get("content") or ""
+ bodyType = (body.get("contentType") or "").lower()
+ if bodyType == "html" or (bodyContent and "<" in bodyContent and ">" in bodyContent):
+ cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars)
+ else:
+ cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) if bodyContent else ""
+
+ parts: List[Dict[str, Any]] = []
+ header = (
+ f"Subject: {subject}\n"
+ f"From: {fromAddr}\n"
+ f"To: {toAddr}\n"
+ + (f"Cc: {ccAddr}\n" if ccAddr else "")
+ + f"Date: {received}"
+ )
+ parts.append({
+ "contentObjectId": "header",
+ "contentType": "text",
+ "data": header,
+ "contextRef": {"part": "header"},
+ })
+ if snippet:
+ parts.append({
+ "contentObjectId": "snippet",
+ "contentType": "text",
+ "data": snippet,
+ "contextRef": {"part": "snippet"},
+ })
+ if cleanedBody:
+ parts.append({
+ "contentObjectId": "body",
+ "contentType": "text",
+ "data": cleanedBody,
+ "contextRef": {"part": "body"},
+ })
+ return parts
+
+
+async def bootstrapOutlook(
+ connectionId: str,
+ *,
+ progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ adapter: Any = None,
+ connection: Any = None,
+ knowledgeService: Any = None,
+ limits: Optional[OutlookBootstrapLimits] = None,
+) -> Dict[str, Any]:
+ """Enumerate Outlook folders (inbox + sent by default) and ingest messages."""
+ limits = limits or OutlookBootstrapLimits()
+ startMs = time.time()
+ result = OutlookBootstrapResult(connectionId=connectionId)
+
+ logger.info(
+ "ingestion.connection.bootstrap.started part=outlook connectionId=%s",
+ connectionId,
+ extra={
+ "event": "ingestion.connection.bootstrap.started",
+ "part": "outlook",
+ "connectionId": connectionId,
+ },
+ )
+
+ if adapter is None or knowledgeService is None or connection is None:
+ adapter, connection, knowledgeService = await _resolveDependencies(connectionId)
+
+ mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
+ userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
+
+ folderIds = await _selectFolderIds(adapter, limits)
+ for folderId in folderIds:
+ if result.indexed + result.skippedDuplicate >= limits.maxMessages:
+ break
+ try:
+ await _ingestFolder(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ folderId=folderId,
+ limits=limits,
+ result=result,
+ progressCb=progressCb,
+ )
+ except Exception as exc:
+ logger.error("outlook ingestion folder %s failed: %s", folderId, exc, exc_info=True)
+ result.errors.append(f"folder({folderId}): {exc}")
+
+ return _finalizeResult(connectionId, result, startMs)
+
+
+async def _resolveDependencies(connectionId: str):
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.auth import TokenManager
+ from modules.connectors.providerMsft.connectorMsft import MsftConnector
+ from modules.serviceCenter import getService
+ from modules.serviceCenter.context import ServiceCenterContext
+ from modules.security.rootAccess import getRootUser
+
+ rootInterface = getRootInterface()
+ connection = rootInterface.getUserConnectionById(connectionId)
+ if connection is None:
+ raise ValueError(f"UserConnection not found: {connectionId}")
+
+ token = TokenManager().getFreshToken(connectionId)
+ if not token or not token.tokenAccess:
+ raise ValueError(f"No valid token for connection {connectionId}")
+
+ provider = MsftConnector(connection, token.tokenAccess)
+ adapter = provider.getServiceAdapter("outlook")
+
+ rootUser = getRootUser()
+ ctx = ServiceCenterContext(
+ user=rootUser,
+ mandate_id=str(getattr(connection, "mandateId", "") or ""),
+ )
+ knowledgeService = getService("knowledge", ctx)
+ return adapter, connection, knowledgeService
+
+
+async def _selectFolderIds(adapter, limits: OutlookBootstrapLimits) -> List[str]:
+ """Prefer well-known folders (inbox, sentitems); fall back to browse()."""
+ folderIds: List[str] = []
+ for wellKnown in WELL_KNOWN_FOLDERS:
+ if len(folderIds) >= limits.maxFolders:
+ break
+ try:
+ row = await adapter._graphGet(f"me/mailFolders/{wellKnown}")
+ except Exception:
+ row = None
+ if isinstance(row, dict) and "error" not in row and row.get("id"):
+ folderIds.append(row["id"])
+
+ if len(folderIds) < limits.maxFolders:
+ try:
+ entries = await adapter.browse("/")
+ except Exception:
+ entries = []
+ for entry in entries:
+ metadata = getattr(entry, "metadata", {}) or {}
+ fid = metadata.get("id")
+ if fid and fid not in folderIds:
+ folderIds.append(fid)
+ if len(folderIds) >= limits.maxFolders:
+ break
+ return folderIds
+
+
+async def _ingestFolder(
+ *,
+ adapter,
+ knowledgeService,
+ connectionId: str,
+ mandateId: str,
+ userId: str,
+ folderId: str,
+ limits: OutlookBootstrapLimits,
+ result: OutlookBootstrapResult,
+ progressCb: Optional[Callable[[int, Optional[str]], None]],
+) -> None:
+ remaining = limits.maxMessages - (result.indexed + result.skippedDuplicate)
+ if remaining <= 0:
+ return
+
+ pageSize = min(100, remaining)
+ select = (
+ "id,subject,from,toRecipients,ccRecipients,receivedDateTime,"
+ "bodyPreview,body,internetMessageId,hasAttachments,changeKey"
+ )
+ endpoint: Optional[str] = (
+ f"me/mailFolders/{folderId}/messages"
+ f"?$top={pageSize}&$orderby=receivedDateTime desc&$select={select}"
+ )
+
+ # Keep header-based age filter in Graph itself to avoid shipping ancient
+ # messages we'd discard client-side.
+ if limits.maxAgeDays:
+ from datetime import datetime, timezone, timedelta
+
+ cutoff = datetime.now(timezone.utc) - timedelta(days=limits.maxAgeDays)
+ cutoffIso = cutoff.strftime("%Y-%m-%dT%H:%M:%SZ")
+ endpoint = f"{endpoint}&$filter=receivedDateTime ge {cutoffIso}"
+
+ while endpoint and (result.indexed + result.skippedDuplicate) < limits.maxMessages:
+ try:
+ page = await adapter._graphGet(endpoint)
+ except Exception as exc:
+ logger.warning("outlook graph page failed for folder %s: %s", folderId, exc)
+ result.errors.append(f"graph({folderId}): {exc}")
+ return
+ if not isinstance(page, dict) or "error" in page:
+ err = (page or {}).get("error") if isinstance(page, dict) else "unknown"
+ logger.warning("outlook graph page error for folder %s: %s", folderId, err)
+ result.errors.append(f"graph({folderId}): {err}")
+ return
+
+ for message in page.get("value", []) or []:
+ if result.indexed + result.skippedDuplicate >= limits.maxMessages:
+ break
+ await _ingestMessage(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ message=message,
+ limits=limits,
+ result=result,
+ progressCb=progressCb,
+ )
+
+ nextLink = page.get("@odata.nextLink")
+ if not nextLink:
+ break
+ # Strip Graph base so adapter._graphGet accepts the relative path.
+ from modules.connectors.providerMsft.connectorMsft import _stripGraphBase
+
+ endpoint = _stripGraphBase(nextLink)
+
+
+async def _ingestMessage(
+ *,
+ adapter,
+ knowledgeService,
+ connectionId: str,
+ mandateId: str,
+ userId: str,
+ message: Dict[str, Any],
+ limits: OutlookBootstrapLimits,
+ result: OutlookBootstrapResult,
+ progressCb: Optional[Callable[[int, Optional[str]], None]],
+) -> None:
+ from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
+
+ messageId = message.get("id")
+ if not messageId:
+ result.skippedPolicy += 1
+ return
+ revision = message.get("changeKey") or message.get("internetMessageId")
+ subject = message.get("subject") or "(no subject)"
+ syntheticId = _syntheticMessageId(connectionId, messageId)
+ fileName = f"{subject[:80].strip()}.eml" if subject else f"{messageId}.eml"
+
+ contentObjects = _buildContentObjects(message, limits.maxBodyChars)
+ # Always at least the header is emitted, so `contentObjects` is non-empty.
+ try:
+ handle = await knowledgeService.requestIngestion(
+ IngestionJob(
+ sourceKind="outlook_message",
+ sourceId=syntheticId,
+ fileName=fileName,
+ mimeType="message/rfc822",
+ userId=userId,
+ mandateId=mandateId,
+ contentObjects=contentObjects,
+ contentVersion=revision,
+ provenance={
+ "connectionId": connectionId,
+ "authority": "msft",
+ "service": "outlook",
+ "externalItemId": messageId,
+ "internetMessageId": message.get("internetMessageId"),
+ "tier": "body",
+ },
+ )
+ )
+ except Exception as exc:
+ logger.error("outlook ingestion %s failed: %s", messageId, exc, exc_info=True)
+ result.failed += 1
+ result.errors.append(f"ingest({messageId}): {exc}")
+ return
+
+ if handle.status == "duplicate":
+ result.skippedDuplicate += 1
+ elif handle.status == "indexed":
+ result.indexed += 1
+ else:
+ result.failed += 1
+
+ if limits.includeAttachments and message.get("hasAttachments"):
+ try:
+ await _ingestAttachments(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ messageId=messageId,
+ parentSyntheticId=syntheticId,
+ limits=limits,
+ result=result,
+ )
+ except Exception as exc:
+ logger.warning("outlook attachments %s failed: %s", messageId, exc)
+ result.errors.append(f"attachments({messageId}): {exc}")
+
+ if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0:
+ processed = result.indexed + result.skippedDuplicate
+ try:
+ progressCb(
+ min(90, 10 + int(80 * processed / max(1, limits.maxMessages))),
+ f"outlook processed={processed}",
+ )
+ except Exception:
+ pass
+ logger.info(
+ "ingestion.connection.bootstrap.progress part=outlook processed=%d skippedDup=%d failed=%d",
+ processed, result.skippedDuplicate, result.failed,
+ extra={
+ "event": "ingestion.connection.bootstrap.progress",
+ "part": "outlook",
+ "connectionId": connectionId,
+ "processed": processed,
+ "skippedDup": result.skippedDuplicate,
+ "failed": result.failed,
+ },
+ )
+
+ await asyncio.sleep(0)
+
+
+async def _ingestAttachments(
+ *,
+ adapter,
+ knowledgeService,
+ connectionId: str,
+ mandateId: str,
+ userId: str,
+ messageId: str,
+ parentSyntheticId: str,
+ limits: OutlookBootstrapLimits,
+ result: OutlookBootstrapResult,
+) -> None:
+ """Child ingestion jobs for file attachments (skip inline & oversized)."""
+ from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
+ from modules.datamodels.datamodelExtraction import ExtractionOptions
+ from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
+ from modules.serviceCenter.services.serviceExtraction.subRegistry import (
+ ExtractorRegistry, ChunkerRegistry,
+ )
+ import base64
+
+ page = await adapter._graphGet(f"me/messages/{messageId}/attachments")
+ if not isinstance(page, dict) or "error" in page:
+ return
+
+ extractorRegistry = ExtractorRegistry()
+ chunkerRegistry = ChunkerRegistry()
+
+ for attachment in page.get("value", []) or []:
+ if attachment.get("@odata.type") != "#microsoft.graph.fileAttachment":
+ continue
+ if attachment.get("isInline"):
+ continue
+ size = int(attachment.get("size") or 0)
+ if size and size > limits.maxAttachmentBytes:
+ result.skippedPolicy += 1
+ continue
+ contentBytesB64 = attachment.get("contentBytes")
+ if not contentBytesB64:
+ continue
+ try:
+ rawBytes = base64.b64decode(contentBytesB64)
+ except Exception:
+ result.skippedPolicy += 1
+ continue
+ fileName = attachment.get("name") or "attachment"
+ mimeType = attachment.get("contentType") or "application/octet-stream"
+ attachmentId = attachment.get("id") or fileName
+ syntheticId = _syntheticAttachmentId(connectionId, messageId, attachmentId)
+
+ try:
+ extracted = runExtraction(
+ extractorRegistry, chunkerRegistry,
+ rawBytes, fileName, mimeType,
+ ExtractionOptions(mergeStrategy=None),
+ )
+ except Exception as exc:
+ logger.warning("outlook attachment extract %s failed: %s", attachmentId, exc)
+ result.failed += 1
+ continue
+
+ contentObjects: List[Dict[str, Any]] = []
+ for part in getattr(extracted, "parts", None) or []:
+ data = getattr(part, "data", None) or ""
+ if not data or not str(data).strip():
+ continue
+ typeGroup = getattr(part, "typeGroup", "text") or "text"
+ contentType = "text"
+ if typeGroup == "image":
+ contentType = "image"
+ elif typeGroup in ("binary", "container"):
+ contentType = "other"
+ contentObjects.append({
+ "contentObjectId": getattr(part, "id", ""),
+ "contentType": contentType,
+ "data": data,
+ "contextRef": {
+ "containerPath": fileName,
+ "location": getattr(part, "label", None) or "attachment",
+ **(getattr(part, "metadata", None) or {}),
+ },
+ })
+ if not contentObjects:
+ result.skippedPolicy += 1
+ continue
+
+ try:
+ await knowledgeService.requestIngestion(
+ IngestionJob(
+ sourceKind="outlook_attachment",
+ sourceId=syntheticId,
+ fileName=fileName,
+ mimeType=mimeType,
+ userId=userId,
+ mandateId=mandateId,
+ contentObjects=contentObjects,
+ provenance={
+ "connectionId": connectionId,
+ "authority": "msft",
+ "service": "outlook",
+ "parentId": parentSyntheticId,
+ "externalItemId": attachmentId,
+ "parentMessageId": messageId,
+ },
+ )
+ )
+ result.attachmentsIndexed += 1
+ except Exception as exc:
+ logger.warning("outlook attachment ingest %s failed: %s", attachmentId, exc)
+ result.failed += 1
+
+
+def _finalizeResult(connectionId: str, result: OutlookBootstrapResult, startMs: float) -> Dict[str, Any]:
+ durationMs = int((time.time() - startMs) * 1000)
+ logger.info(
+ "ingestion.connection.bootstrap.done part=outlook connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d attachments=%d failed=%d durationMs=%d",
+ connectionId,
+ result.indexed, result.skippedDuplicate, result.skippedPolicy,
+ result.attachmentsIndexed, result.failed, durationMs,
+ extra={
+ "event": "ingestion.connection.bootstrap.done",
+ "part": "outlook",
+ "connectionId": connectionId,
+ "indexed": result.indexed,
+ "skippedDup": result.skippedDuplicate,
+ "skippedPolicy": result.skippedPolicy,
+ "attachmentsIndexed": result.attachmentsIndexed,
+ "failed": result.failed,
+ "durationMs": durationMs,
+ },
+ )
+ return {
+ "connectionId": result.connectionId,
+ "indexed": result.indexed,
+ "skippedDuplicate": result.skippedDuplicate,
+ "skippedPolicy": result.skippedPolicy,
+ "attachmentsIndexed": result.attachmentsIndexed,
+ "failed": result.failed,
+ "durationMs": durationMs,
+ "errors": result.errors[:20],
+ }
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
new file mode 100644
index 00000000..0bceecac
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
@@ -0,0 +1,425 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""SharePoint bootstrap for the unified knowledge ingestion lane.
+
+Walks the SharePoint drive(s) reachable via a UserConnection, downloads each
+file-like item, runs the standard content extraction pipeline and hands the
+result to `KnowledgeService.requestIngestion`. Idempotency is provided by the
+ingestion façade itself; repeat bootstraps therefore produce
+`ingestion.skipped.duplicate` for every unchanged item because we pass the
+Graph `eTag` as `contentVersion`.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import logging
+import time
+from dataclasses import dataclass, field
+from typing import Any, Callable, Dict, List, Optional
+
+from modules.datamodels.datamodelExtraction import ExtractionOptions
+
+logger = logging.getLogger(__name__)
+
+MAX_ITEMS_DEFAULT = 500
+MAX_BYTES_DEFAULT = 200 * 1024 * 1024
+MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024
+SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/")
+MAX_DEPTH_DEFAULT = 4
+MAX_SITES_DEFAULT = 3
+
+
+@dataclass
+class SharepointBootstrapLimits:
+ maxItems: int = MAX_ITEMS_DEFAULT
+ maxBytes: int = MAX_BYTES_DEFAULT
+ maxFileSize: int = MAX_FILE_SIZE_DEFAULT
+ skipMimePrefixes: tuple = SKIP_MIME_PREFIXES_DEFAULT
+ maxDepth: int = MAX_DEPTH_DEFAULT
+ maxSites: int = MAX_SITES_DEFAULT
+
+
+@dataclass
+class SharepointBootstrapResult:
+ connectionId: str
+ indexed: int = 0
+ skippedDuplicate: int = 0
+ skippedPolicy: int = 0
+ failed: int = 0
+ bytesProcessed: int = 0
+ errors: List[str] = field(default_factory=list)
+
+
+def _syntheticFileId(connectionId: str, externalItemId: str) -> str:
+ """Deterministic synthetic FileContentIndex id for a SharePoint item.
+
+ Stable across bootstraps → idempotency works; independent of file name so
+ moves/renames don't duplicate chunks.
+ """
+ token = hashlib.sha256(f"{connectionId}:{externalItemId}".encode("utf-8")).hexdigest()[:16]
+ return f"sp:{connectionId[:8]}:{token}"
+
+
+def _toContentObjects(extracted, fileName: str) -> List[Dict[str, Any]]:
+ """Translate ExtractionResult → content objects accepted by requestIngestion."""
+ parts = getattr(extracted, "parts", None) or []
+ out: List[Dict[str, Any]] = []
+ for part in parts:
+ data = getattr(part, "data", None) or ""
+ if not data or not str(data).strip():
+ continue
+ typeGroup = getattr(part, "typeGroup", "text") or "text"
+ contentType = "text"
+ if typeGroup == "image":
+ contentType = "image"
+ elif typeGroup in ("binary", "container"):
+ contentType = "other"
+ out.append({
+ "contentObjectId": getattr(part, "id", ""),
+ "contentType": contentType,
+ "data": data,
+ "contextRef": {
+ "containerPath": fileName,
+ "location": getattr(part, "label", None) or "file",
+ **(getattr(part, "metadata", None) or {}),
+ },
+ })
+ return out
+
+
+async def bootstrapSharepoint(
+ connectionId: str,
+ *,
+ progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ adapter: Any = None,
+ connection: Any = None,
+ knowledgeService: Any = None,
+ limits: Optional[SharepointBootstrapLimits] = None,
+ runExtractionFn: Optional[Callable[..., Any]] = None,
+) -> Dict[str, Any]:
+ """Enumerate SharePoint drives and ingest every reachable file via the façade.
+
+ Parameters allow injection for tests; production callers pass only
+ `connectionId` (and optionally a progressCb) and everything else is
+ resolved against the registered services.
+ """
+ limits = limits or SharepointBootstrapLimits()
+ startMs = time.time()
+ result = SharepointBootstrapResult(connectionId=connectionId)
+
+ logger.info(
+ "ingestion.connection.bootstrap.started part=sharepoint connectionId=%s",
+ connectionId,
+ extra={
+ "event": "ingestion.connection.bootstrap.started",
+ "part": "sharepoint",
+ "connectionId": connectionId,
+ },
+ )
+
+ if adapter is None or knowledgeService is None or connection is None:
+ adapter, connection, knowledgeService = await _resolveDependencies(connectionId)
+ if runExtractionFn is None:
+ from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
+ from modules.serviceCenter.services.serviceExtraction.subRegistry import (
+ ExtractorRegistry, ChunkerRegistry,
+ )
+ extractorRegistry = ExtractorRegistry()
+ chunkerRegistry = ChunkerRegistry()
+
+ def runExtractionFn(bytesData, name, mime, options): # type: ignore[no-redef]
+ return runExtraction(extractorRegistry, chunkerRegistry, bytesData, name, mime, options)
+
+ mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
+ userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
+
+ try:
+ sites = await adapter.browse("/", limit=limits.maxSites)
+ except Exception as exc:
+ logger.error("sharepoint site discovery failed for %s: %s", connectionId, exc, exc_info=True)
+ result.errors.append(f"site_discovery: {exc}")
+ return _finalizeResult(connectionId, result, startMs)
+
+ for site in sites[: limits.maxSites]:
+ if result.indexed + result.skippedDuplicate >= limits.maxItems:
+ break
+ sitePath = getattr(site, "path", "") or ""
+ try:
+ await _walkFolder(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ runExtractionFn=runExtractionFn,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ folderPath=sitePath,
+ depth=0,
+ limits=limits,
+ result=result,
+ progressCb=progressCb,
+ )
+ except Exception as exc:
+ logger.error("sharepoint walk failed for site %s: %s", sitePath, exc, exc_info=True)
+ result.errors.append(f"walk({sitePath}): {exc}")
+
+ return _finalizeResult(connectionId, result, startMs)
+
+
+async def _resolveDependencies(connectionId: str):
+ """Load connection, instantiate SharepointAdapter, and build a KnowledgeService.
+
+ Runs with root privileges: bootstrap is a system operation triggered by an
+ authenticated user via callback; it must not be gated by a per-user
+ service-center context.
+ """
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.auth import TokenManager
+ from modules.connectors.providerMsft.connectorMsft import MsftConnector
+ from modules.serviceCenter import getService
+ from modules.serviceCenter.context import ServiceCenterContext
+ from modules.security.rootAccess import getRootUser
+
+ rootInterface = getRootInterface()
+ connection = rootInterface.getUserConnectionById(connectionId)
+ if connection is None:
+ raise ValueError(f"UserConnection not found: {connectionId}")
+
+ token = TokenManager().getFreshToken(connectionId)
+ if not token or not token.tokenAccess:
+ raise ValueError(f"No valid token for connection {connectionId}")
+
+ provider = MsftConnector(connection, token.tokenAccess)
+ adapter = provider.getServiceAdapter("sharepoint")
+
+ rootUser = getRootUser()
+ ctx = ServiceCenterContext(
+ user=rootUser,
+ mandate_id=str(getattr(connection, "mandateId", "") or ""),
+ )
+ knowledgeService = getService("knowledge", ctx)
+ return adapter, connection, knowledgeService
+
+
+async def _walkFolder(
+ *,
+ adapter,
+ knowledgeService,
+ runExtractionFn,
+ connectionId: str,
+ mandateId: str,
+ userId: str,
+ folderPath: str,
+ depth: int,
+ limits: SharepointBootstrapLimits,
+ result: SharepointBootstrapResult,
+ progressCb: Optional[Callable[[int, Optional[str]], None]],
+) -> None:
+ if depth > limits.maxDepth:
+ return
+ try:
+ entries = await adapter.browse(folderPath)
+ except Exception as exc:
+ logger.warning("sharepoint browse %s failed: %s", folderPath, exc)
+ result.errors.append(f"browse({folderPath}): {exc}")
+ return
+
+ for entry in entries:
+ if result.indexed + result.skippedDuplicate >= limits.maxItems:
+ return
+ if result.bytesProcessed >= limits.maxBytes:
+ return
+
+ entryPath = getattr(entry, "path", "") or ""
+ if getattr(entry, "isFolder", False):
+ await _walkFolder(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ runExtractionFn=runExtractionFn,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ folderPath=entryPath,
+ depth=depth + 1,
+ limits=limits,
+ result=result,
+ progressCb=progressCb,
+ )
+ continue
+
+ mimeType = getattr(entry, "mimeType", None) or "application/octet-stream"
+ if any(mimeType.startswith(prefix) for prefix in limits.skipMimePrefixes):
+ result.skippedPolicy += 1
+ continue
+ size = int(getattr(entry, "size", 0) or 0)
+ if size and size > limits.maxFileSize:
+ result.skippedPolicy += 1
+ continue
+
+ metadata = getattr(entry, "metadata", {}) or {}
+ externalItemId = metadata.get("id") or entryPath
+ revision = metadata.get("revision") or metadata.get("lastModifiedDateTime")
+
+ await _ingestOne(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ runExtractionFn=runExtractionFn,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ entry=entry,
+ entryPath=entryPath,
+ mimeType=mimeType,
+ externalItemId=externalItemId,
+ revision=revision,
+ limits=limits,
+ result=result,
+ progressCb=progressCb,
+ )
+
+
+async def _ingestOne(
+ *,
+ adapter,
+ knowledgeService,
+ runExtractionFn,
+ connectionId: str,
+ mandateId: str,
+ userId: str,
+ entry,
+ entryPath: str,
+ mimeType: str,
+ externalItemId: str,
+ revision: Optional[str],
+ limits: SharepointBootstrapLimits,
+ result: SharepointBootstrapResult,
+ progressCb: Optional[Callable[[int, Optional[str]], None]],
+) -> None:
+ from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
+
+ syntheticFileId = _syntheticFileId(connectionId, externalItemId)
+ fileName = getattr(entry, "name", "") or externalItemId
+
+ try:
+ fileBytes = await adapter.download(entryPath)
+ except Exception as exc:
+ logger.warning("sharepoint download %s failed: %s", entryPath, exc)
+ result.failed += 1
+ result.errors.append(f"download({entryPath}): {exc}")
+ return
+ if not fileBytes:
+ result.failed += 1
+ return
+
+ result.bytesProcessed += len(fileBytes)
+
+ try:
+ extracted = runExtractionFn(
+ fileBytes, fileName, mimeType,
+ ExtractionOptions(mergeStrategy=None),
+ )
+ except Exception as exc:
+ logger.warning("sharepoint extraction %s failed: %s", entryPath, exc)
+ result.failed += 1
+ result.errors.append(f"extract({entryPath}): {exc}")
+ return
+
+ contentObjects = _toContentObjects(extracted, fileName)
+ if not contentObjects:
+ result.skippedPolicy += 1
+ return
+
+ provenance: Dict[str, Any] = {
+ "connectionId": connectionId,
+ "authority": "msft",
+ "service": "sharepoint",
+ "externalItemId": externalItemId,
+ "externalPath": entryPath,
+ "revision": revision,
+ }
+ try:
+ handle = await knowledgeService.requestIngestion(
+ IngestionJob(
+ sourceKind="sharepoint_item",
+ sourceId=syntheticFileId,
+ fileName=fileName,
+ mimeType=mimeType,
+ userId=userId,
+ mandateId=mandateId,
+ contentObjects=contentObjects,
+ contentVersion=revision,
+ provenance=provenance,
+ )
+ )
+ except Exception as exc:
+ logger.error("sharepoint ingestion %s failed: %s", entryPath, exc, exc_info=True)
+ result.failed += 1
+ result.errors.append(f"ingest({entryPath}): {exc}")
+ return
+
+ if handle.status == "duplicate":
+ result.skippedDuplicate += 1
+ elif handle.status == "indexed":
+ result.indexed += 1
+ else:
+ result.failed += 1
+ if handle.error:
+ result.errors.append(f"ingest({entryPath}): {handle.error}")
+
+ if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0:
+ processed = result.indexed + result.skippedDuplicate
+ try:
+ progressCb(
+ min(90, 10 + int(80 * processed / max(1, limits.maxItems))),
+ f"sharepoint processed={processed}",
+ )
+ except Exception:
+ pass
+ logger.info(
+ "ingestion.connection.bootstrap.progress part=sharepoint processed=%d skippedDup=%d failed=%d",
+ processed, result.skippedDuplicate, result.failed,
+ extra={
+ "event": "ingestion.connection.bootstrap.progress",
+ "part": "sharepoint",
+ "connectionId": connectionId,
+ "processed": processed,
+ "skippedDup": result.skippedDuplicate,
+ "failed": result.failed,
+ },
+ )
+
+ # Yield so the event loop can interleave other tasks (download/extract are
+ # CPU-ish and extraction uses sync libs; cooperative scheduling prevents
+ # starving other workers).
+ await asyncio.sleep(0)
+
+
+def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startMs: float) -> Dict[str, Any]:
+ durationMs = int((time.time() - startMs) * 1000)
+ logger.info(
+ "ingestion.connection.bootstrap.done part=sharepoint connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d",
+ connectionId,
+ result.indexed, result.skippedDuplicate, result.skippedPolicy, result.failed,
+ durationMs,
+ extra={
+ "event": "ingestion.connection.bootstrap.done",
+ "part": "sharepoint",
+ "connectionId": connectionId,
+ "indexed": result.indexed,
+ "skippedDup": result.skippedDuplicate,
+ "skippedPolicy": result.skippedPolicy,
+ "failed": result.failed,
+ "durationMs": durationMs,
+ },
+ )
+ return {
+ "connectionId": result.connectionId,
+ "indexed": result.indexed,
+ "skippedDuplicate": result.skippedDuplicate,
+ "skippedPolicy": result.skippedPolicy,
+ "failed": result.failed,
+ "bytesProcessed": result.bytesProcessed,
+ "durationMs": durationMs,
+ "errors": result.errors[:20],
+ }
diff --git a/modules/serviceCenter/services/serviceKnowledge/subTextClean.py b/modules/serviceCenter/services/serviceKnowledge/subTextClean.py
new file mode 100644
index 00000000..2d352cfa
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/subTextClean.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Text normalisation utilities used by knowledge ingestion.
+
+The email body cleaning logic is intentionally regex-based and works on plain
+text after an HTML→text pass so we never store unsanitised HTML/JS in the
+knowledge store and retrieval stays robust (no extraneous markup tokens
+eating embedding budget).
+"""
+
+from __future__ import annotations
+
+import re
+from typing import Optional
+
+DEFAULT_MAX_CHARS = 8000
+
+
+_QUOTE_MARKER_PATTERNS = [
+ re.compile(r"^\s*(?:On\s.+?\swrote:)\s*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*(?:Am\s.+?\sschrieb.+?:)\s*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*-{2,}\s*Original\s*Message\s*-{2,}\s*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*-{2,}\s*Urspr.+Nachricht\s*-{2,}\s*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*From:\s+.+$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Von:\s+.+$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Sent:\s+.+$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Gesendet:\s+.+$", re.MULTILINE | re.IGNORECASE),
+]
+
+_SIGNATURE_MARKERS = [
+ re.compile(r"^\s*-{2,}\s*$", re.MULTILINE),
+ re.compile(r"^\s*—\s*$", re.MULTILINE),
+ re.compile(r"^\s*Best regards\b.*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Kind regards\b.*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Mit freundlichen Gr[üu]ßen\b.*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Viele Gr[üu]ße\b.*$", re.MULTILINE | re.IGNORECASE),
+ re.compile(r"^\s*Best,\s*$", re.MULTILINE | re.IGNORECASE),
+]
+
+
+def _htmlToText(html: str) -> str:
+ """Prefer BeautifulSoup when available, fall back to regex."""
+ try:
+ from bs4 import BeautifulSoup # type: ignore
+
+ soup = BeautifulSoup(html, "html.parser")
+ for tag in soup(["script", "style", "head"]):
+ tag.decompose()
+ for br in soup.find_all(["br"]):
+ br.replace_with("\n")
+ for p in soup.find_all(["p", "div", "li", "tr"]):
+ p.append("\n")
+ text = soup.get_text()
+ except Exception:
+ # Minimal fallback: strip tags crudely.
+ text = re.sub(r"
", "\n", html, flags=re.IGNORECASE)
+ text = re.sub(r"(?:p|div|li|tr)>", "\n", text, flags=re.IGNORECASE)
+ text = re.sub(r"<[^>]+>", "", text)
+ # Collapse non-breaking + zero-width whitespace.
+ text = text.replace("\u00a0", " ").replace("\u200b", "")
+ return text
+
+
+def _stripQuotedThread(text: str) -> str:
+ """Remove reply-chain content so only the author's own contribution remains."""
+ earliest = len(text)
+ for pattern in _QUOTE_MARKER_PATTERNS:
+ match = pattern.search(text)
+ if match and match.start() < earliest:
+ earliest = match.start()
+ # Drop any block starting with "> " quoted lines (often Gmail/Thunderbird).
+ quotedBlock = re.search(r"^(?:\s*>.*\n?)+", text, re.MULTILINE)
+ if quotedBlock and quotedBlock.start() < earliest:
+ earliest = quotedBlock.start()
+ return text[:earliest].rstrip()
+
+
+def _stripSignature(text: str) -> str:
+ earliest = len(text)
+ for pattern in _SIGNATURE_MARKERS:
+ match = pattern.search(text)
+ if match and match.start() < earliest:
+ earliest = match.start()
+ return text[:earliest].rstrip()
+
+
+def _collapseWhitespace(text: str) -> str:
+ text = re.sub(r"[ \t]+", " ", text)
+ text = re.sub(r"\n{3,}", "\n\n", text)
+ return text.strip()
+
+
+def cleanEmailBody(html: str, maxChars: Optional[int] = DEFAULT_MAX_CHARS) -> str:
+ """Return a compact plain-text view of an email body suitable for embedding.
+
+ Steps: HTML → text, remove quoted reply chain, remove signature, collapse
+ whitespace, truncate to maxChars. Always returns a string (possibly empty).
+ """
+ if not html:
+ return ""
+ text = _htmlToText(html) if "<" in html and ">" in html else html
+ text = _stripQuotedThread(text)
+ text = _stripSignature(text)
+ text = _collapseWhitespace(text)
+ if maxChars and len(text) > maxChars:
+ text = text[:maxChars].rstrip() + "…"
+ return text
diff --git a/tests/unit/services/test_bootstrap_outlook.py b/tests/unit/services/test_bootstrap_outlook.py
new file mode 100644
index 00000000..26664eaa
--- /dev/null
+++ b/tests/unit/services/test_bootstrap_outlook.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Bootstrap Outlook tests with a fake adapter + knowledge service.
+
+Verifies:
+- Well-known folders (inbox, sentitems) are discovered via Graph.
+- Each message produces a `requestIngestion` call with sourceKind=outlook_message
+ and structured contentObjects (header / snippet / body).
+- Pagination via `@odata.nextLink` is followed.
+- changeKey is forwarded as contentVersion → idempotency.
+"""
+
+import asyncio
+import os
+import sys
+from types import SimpleNamespace
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
+
+from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook import (
+ bootstrapOutlook,
+ OutlookBootstrapLimits,
+ _syntheticMessageId,
+ _buildContentObjects,
+)
+
+
+class _FakeOutlookAdapter:
+ def __init__(self, messages_by_folder, paginated_folder=None, page2=None):
+ self._folders = {"inbox": "INBOX-ID", "sentitems": "SENT-ID"}
+ self._messages = messages_by_folder
+ self._paginated_folder = paginated_folder
+ self._page2 = page2 or []
+ self.requested_endpoints = []
+
+ async def _graphGet(self, endpoint: str):
+ self.requested_endpoints.append(endpoint)
+ if endpoint.startswith("me/mailFolders/") and "/messages" not in endpoint:
+ wellKnown = endpoint.split("/")[-1]
+ fid = self._folders.get(wellKnown)
+ if not fid:
+ return {"error": "not found"}
+ return {"id": fid, "displayName": wellKnown}
+ # message page request: e.g. me/mailFolders/INBOX-ID/messages?...
+ for fid, messages in self._messages.items():
+ if f"me/mailFolders/{fid}/messages" in endpoint:
+ page = {"value": messages}
+ if fid == self._paginated_folder and "skiptoken" not in endpoint:
+ page["@odata.nextLink"] = (
+ "https://graph.microsoft.com/v1.0/"
+ f"me/mailFolders/{fid}/messages?$skiptoken=abc"
+ )
+ elif fid == self._paginated_folder and "skiptoken" in endpoint:
+ page = {"value": self._page2}
+ return page
+ return {"value": []}
+
+ async def browse(self, path):
+ return []
+
+
+class _FakeKnowledgeService:
+ def __init__(self, duplicateIds=None):
+ self.calls = []
+ self._duplicates = duplicateIds or set()
+
+ async def requestIngestion(self, job):
+ self.calls.append(job)
+ status = "duplicate" if job.sourceId in self._duplicates else "indexed"
+ return SimpleNamespace(
+ jobId=job.sourceId, status=status, contentHash="h",
+ fileId=job.sourceId, index=None, error=None,
+ )
+
+
+def _msg(mid: str, subject: str = "Hi", change: str = "ck1"):
+ return {
+ "id": mid,
+ "subject": subject,
+ "from": {"emailAddress": {"name": "Alice", "address": "a@x.com"}},
+ "toRecipients": [{"emailAddress": {"name": "Bob", "address": "b@x.com"}}],
+ "ccRecipients": [],
+ "receivedDateTime": "2025-01-01T10:00:00Z",
+ "bodyPreview": "Hello world",
+ "body": {"contentType": "text", "content": "Hello world\nThis is the body."},
+ "internetMessageId": f"<{mid}@local>",
+ "hasAttachments": False,
+ "changeKey": change,
+ }
+
+
+def test_buildContentObjects_emits_header_snippet_body():
+ parts = _buildContentObjects(_msg("m1"), maxBodyChars=8000)
+ ids = [p["contentObjectId"] for p in parts]
+ assert ids == ["header", "snippet", "body"]
+ header = parts[0]["data"]
+ assert "Subject: Hi" in header
+ assert "From: Alice " in header
+ assert "To: Bob " in header
+
+
+def test_bootstrap_outlook_indexes_messages_from_inbox_and_sent():
+ adapter = _FakeOutlookAdapter({
+ "INBOX-ID": [_msg("m1"), _msg("m2")],
+ "SENT-ID": [_msg("m3")],
+ })
+ knowledge = _FakeKnowledgeService()
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapOutlook(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ limits=OutlookBootstrapLimits(maxAgeDays=None),
+ )
+
+ result = asyncio.run(_run())
+ assert result["indexed"] == 3
+ sourceIds = {c.sourceId for c in knowledge.calls}
+ assert sourceIds == {
+ _syntheticMessageId("c1", "m1"),
+ _syntheticMessageId("c1", "m2"),
+ _syntheticMessageId("c1", "m3"),
+ }
+ for job in knowledge.calls:
+ assert job.sourceKind == "outlook_message"
+ assert job.mimeType == "message/rfc822"
+ assert job.provenance["connectionId"] == "c1"
+ assert job.provenance["service"] == "outlook"
+ assert job.contentVersion == "ck1"
+ assert any(co["contentObjectId"] == "header" for co in job.contentObjects)
+
+
+def test_bootstrap_outlook_follows_pagination():
+ adapter = _FakeOutlookAdapter(
+ messages_by_folder={"INBOX-ID": [_msg("m1")], "SENT-ID": []},
+ paginated_folder="INBOX-ID",
+ page2=[_msg("m2"), _msg("m3")],
+ )
+ knowledge = _FakeKnowledgeService()
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapOutlook(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ limits=OutlookBootstrapLimits(maxAgeDays=None),
+ )
+
+ result = asyncio.run(_run())
+ assert result["indexed"] == 3
+
+
+def test_bootstrap_outlook_reports_duplicates():
+ adapter = _FakeOutlookAdapter({
+ "INBOX-ID": [_msg("m1"), _msg("m2")],
+ "SENT-ID": [],
+ })
+ duplicates = {
+ _syntheticMessageId("c1", "m1"),
+ _syntheticMessageId("c1", "m2"),
+ }
+ knowledge = _FakeKnowledgeService(duplicateIds=duplicates)
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapOutlook(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ limits=OutlookBootstrapLimits(maxAgeDays=None),
+ )
+
+ result = asyncio.run(_run())
+ assert result["indexed"] == 0
+ assert result["skippedDuplicate"] == 2
+
+
+if __name__ == "__main__":
+ test_buildContentObjects_emits_header_snippet_body()
+ test_bootstrap_outlook_indexes_messages_from_inbox_and_sent()
+ test_bootstrap_outlook_follows_pagination()
+ test_bootstrap_outlook_reports_duplicates()
+ print("OK — bootstrapOutlook tests passed")
diff --git a/tests/unit/services/test_bootstrap_sharepoint.py b/tests/unit/services/test_bootstrap_sharepoint.py
new file mode 100644
index 00000000..8b011357
--- /dev/null
+++ b/tests/unit/services/test_bootstrap_sharepoint.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Bootstrap SharePoint tests with a fake adapter + knowledge service.
+
+Verifies:
+- Every discovered file triggers `requestIngestion`.
+- Duplicate runs (same eTag revisions) report `skippedDuplicate`.
+- Synthetic fileIds are stable across runs so idempotency works end-to-end.
+"""
+
+import asyncio
+import os
+import sys
+from dataclasses import dataclass
+from types import SimpleNamespace
+from typing import Any, Dict, List, Optional
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
+
+from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
+ bootstrapSharepoint,
+ _syntheticFileId,
+)
+
+
+@dataclass
+class _ExtEntry:
+ name: str
+ path: str
+ isFolder: bool = False
+ size: Optional[int] = None
+ mimeType: Optional[str] = None
+ metadata: Dict[str, Any] = None
+
+
+class _FakeSpAdapter:
+ """Minimal SharepointAdapter stand-in.
+
+ Layout:
+ "/" → 1 site
+ "/sites/site-1" → 2 files (f1, f2) + 1 folder (sub)
+ "/sites/site-1/sub" → 1 file (f3)
+ """
+
+ def __init__(self):
+ self.downloaded: List[str] = []
+
+ async def browse(self, path: str, filter=None, limit=None):
+ if path == "/":
+ return [
+ _ExtEntry(
+ name="Site 1",
+ path="/sites/site-1",
+ isFolder=True,
+ metadata={"id": "site-1"},
+ ),
+ ]
+ if path == "/sites/site-1":
+ return [
+ _ExtEntry(
+ name="f1.txt", path="/sites/site-1/f1.txt",
+ mimeType="text/plain", size=20,
+ metadata={"id": "f1", "revision": "etag-f1"},
+ ),
+ _ExtEntry(
+ name="f2.txt", path="/sites/site-1/f2.txt",
+ mimeType="text/plain", size=20,
+ metadata={"id": "f2", "revision": "etag-f2"},
+ ),
+ _ExtEntry(
+ name="sub", path="/sites/site-1/sub",
+ isFolder=True, metadata={"id": "sub"},
+ ),
+ ]
+ if path == "/sites/site-1/sub":
+ return [
+ _ExtEntry(
+ name="f3.txt", path="/sites/site-1/sub/f3.txt",
+ mimeType="text/plain", size=20,
+ metadata={"id": "f3", "revision": "etag-f3"},
+ ),
+ ]
+ return []
+
+ async def download(self, path: str) -> bytes:
+ self.downloaded.append(path)
+ return path.encode("utf-8")
+
+
+class _FakeKnowledgeService:
+ """Records requestIngestion calls and returns the scripted handles."""
+
+ def __init__(self, duplicateIds=None):
+ self.calls: List[SimpleNamespace] = []
+ self._duplicateIds = duplicateIds or set()
+
+ async def requestIngestion(self, job):
+ self.calls.append(job)
+ status = "duplicate" if job.sourceId in self._duplicateIds else "indexed"
+ return SimpleNamespace(
+ jobId=f"{job.sourceKind}:{job.sourceId}",
+ status=status,
+ contentHash="h",
+ fileId=job.sourceId,
+ index=None,
+ error=None,
+ )
+
+
+def _fakeRunExtraction(data, name, mime, options):
+ """Produce a single synthetic text part so `_toContentObjects` returns one."""
+ return SimpleNamespace(
+ parts=[
+ SimpleNamespace(
+ id="p1",
+ data=data.decode("utf-8") if isinstance(data, bytes) else str(data),
+ typeGroup="text",
+ label="page:1",
+ metadata={"pageIndex": 0},
+ )
+ ]
+ )
+
+
+def test_bootstrap_walks_sites_and_subfolders():
+ adapter = _FakeSpAdapter()
+ knowledge = _FakeKnowledgeService()
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapSharepoint(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ runExtractionFn=_fakeRunExtraction,
+ )
+
+ result = asyncio.run(_run())
+ assert len(knowledge.calls) == 3
+ sourceIds = {c.sourceId for c in knowledge.calls}
+ assert sourceIds == {
+ _syntheticFileId("c1", "f1"),
+ _syntheticFileId("c1", "f2"),
+ _syntheticFileId("c1", "f3"),
+ }
+ assert result["indexed"] == 3
+ assert result["skippedDuplicate"] == 0
+ assert adapter.downloaded == [
+ "/sites/site-1/f1.txt",
+ "/sites/site-1/f2.txt",
+ "/sites/site-1/sub/f3.txt",
+ ]
+
+
+def test_bootstrap_reports_duplicates_on_second_run():
+ adapter = _FakeSpAdapter()
+ duplicateIds = {
+ _syntheticFileId("c1", "f1"),
+ _syntheticFileId("c1", "f2"),
+ _syntheticFileId("c1", "f3"),
+ }
+ knowledge = _FakeKnowledgeService(duplicateIds=duplicateIds)
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapSharepoint(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ runExtractionFn=_fakeRunExtraction,
+ )
+
+ result = asyncio.run(_run())
+ assert result["indexed"] == 0
+ assert result["skippedDuplicate"] == 3
+
+
+def test_bootstrap_passes_connection_provenance():
+ adapter = _FakeSpAdapter()
+ knowledge = _FakeKnowledgeService()
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapSharepoint(
+ connectionId="c1",
+ adapter=adapter,
+ connection=connection,
+ knowledgeService=knowledge,
+ runExtractionFn=_fakeRunExtraction,
+ )
+
+ asyncio.run(_run())
+ for job in knowledge.calls:
+ assert job.sourceKind == "sharepoint_item"
+ assert job.mandateId == "m1"
+ assert job.provenance["connectionId"] == "c1"
+ assert job.provenance["authority"] == "msft"
+ assert job.provenance["service"] == "sharepoint"
+ assert job.contentVersion and job.contentVersion.startswith("etag-")
+
+
+if __name__ == "__main__":
+ test_bootstrap_walks_sites_and_subfolders()
+ test_bootstrap_reports_duplicates_on_second_run()
+ test_bootstrap_passes_connection_provenance()
+ print("OK — bootstrapSharepoint tests passed")
diff --git a/tests/unit/services/test_clean_email_body.py b/tests/unit/services/test_clean_email_body.py
new file mode 100644
index 00000000..a3ee01df
--- /dev/null
+++ b/tests/unit/services/test_clean_email_body.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Unit tests for cleanEmailBody.
+
+Covers: HTML→text normalisation, quoted-reply removal, signature removal,
+whitespace collapse and truncation. The utility is used during Outlook
+bootstrap; buggy cleaning would leak quoted threads / signatures into every
+embedding.
+"""
+
+import os
+import sys
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
+
+from modules.serviceCenter.services.serviceKnowledge.subTextClean import (
+ cleanEmailBody,
+)
+
+
+def test_strips_html_tags_and_scripts():
+ html = (
+ ""
+ "Hello world
"
+ ""
+ )
+ cleaned = cleanEmailBody(html)
+ assert "Hello" in cleaned
+ assert "world" in cleaned
+ assert "<" not in cleaned
+ assert "alert" not in cleaned
+
+
+def test_strips_quoted_reply_english():
+ body = (
+ "Actual answer from me.\n\n"
+ "On Mon, 1 Jan 2024 at 10:00, Someone wrote:\n"
+ "> Original question?\n"
+ "> Second line.\n"
+ )
+ cleaned = cleanEmailBody(body)
+ assert "Actual answer" in cleaned
+ assert "Original question" not in cleaned
+ assert "wrote:" not in cleaned
+
+
+def test_strips_quoted_reply_german():
+ body = (
+ "Meine Antwort.\n\n"
+ "Am 1. Januar 2024 um 10:00 schrieb Max Muster :\n"
+ "> Ursprüngliche Frage?\n"
+ )
+ cleaned = cleanEmailBody(body)
+ assert "Meine Antwort" in cleaned
+ assert "Ursprüngliche Frage" not in cleaned
+
+
+def test_strips_signature_after_dashes():
+ body = (
+ "Kurze Nachricht.\n"
+ "\n"
+ "--\n"
+ "Max Muster\n"
+ "Vorstand, Beispiel GmbH\n"
+ )
+ cleaned = cleanEmailBody(body)
+ assert "Kurze Nachricht" in cleaned
+ assert "Beispiel GmbH" not in cleaned
+
+
+def test_strips_signature_salutation_de():
+ body = (
+ "Die eigentliche Information steht hier.\n\n"
+ "Mit freundlichen Grüßen\n"
+ "Max Muster"
+ )
+ cleaned = cleanEmailBody(body)
+ assert "eigentliche Information" in cleaned
+ assert "Max Muster" not in cleaned
+
+
+def test_truncate_to_max_chars():
+ body = "abc " * 5000
+ cleaned = cleanEmailBody(body, maxChars=200)
+ assert len(cleaned) <= 201 # includes trailing ellipsis
+
+
+def test_empty_input_returns_empty_string():
+ assert cleanEmailBody("") == ""
+ assert cleanEmailBody(None) == "" # type: ignore[arg-type]
+
+
+def test_collapses_whitespace():
+ body = "A lot of spaces\n\n\n\nand blank lines"
+ cleaned = cleanEmailBody(body)
+ assert " " not in cleaned
+ assert "\n\n\n" not in cleaned
+
+
+if __name__ == "__main__":
+ test_strips_html_tags_and_scripts()
+ test_strips_quoted_reply_english()
+ test_strips_quoted_reply_german()
+ test_strips_signature_after_dashes()
+ test_strips_signature_salutation_de()
+ test_truncate_to_max_chars()
+ test_empty_input_returns_empty_string()
+ test_collapses_whitespace()
+ print("OK — cleanEmailBody tests passed")
diff --git a/tests/unit/services/test_connection_purge.py b/tests/unit/services/test_connection_purge.py
new file mode 100644
index 00000000..c32cb5b3
--- /dev/null
+++ b/tests/unit/services/test_connection_purge.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Purge tests for KnowledgeObjects.deleteFileContentIndexByConnectionId.
+
+Ensures that a `connection.revoked` event wipes every FileContentIndex + chunk
+linked to the given connectionId while leaving entries from other connections
+(or upload-files with connectionId=None) intact.
+"""
+
+import os
+import sys
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
+
+from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk
+from modules.interfaces.interfaceDbKnowledge import KnowledgeObjects
+
+
+class _FakeDb:
+ """Minimal in-memory stand-in for ``KnowledgeObjects.db``.
+
+ Supports just the subset of APIs that deleteFileContentIndexByConnectionId
+ touches: getRecordset(FileContentIndex|ContentChunk, ...) + recordDelete.
+ """
+
+ def __init__(self):
+ self.indexRows: dict = {}
+ self.chunks: dict = {}
+
+ def addIndex(self, row: dict) -> None:
+ self.indexRows[row["id"]] = row
+
+ def addChunk(self, row: dict) -> None:
+ self.chunks[row["id"]] = row
+
+ def getRecordset(self, modelClass, recordFilter=None, **_):
+ filter_ = recordFilter or {}
+ if modelClass is FileContentIndex:
+ rows = list(self.indexRows.values())
+ elif modelClass is ContentChunk:
+ rows = list(self.chunks.values())
+ else:
+ return []
+
+ def match(row):
+ for k, v in filter_.items():
+ if row.get(k) != v:
+ return False
+ return True
+
+ return [r for r in rows if match(r)]
+
+ def recordDelete(self, modelClass, recordId):
+ if modelClass is FileContentIndex:
+ return self.indexRows.pop(recordId, None) is not None
+ if modelClass is ContentChunk:
+ return self.chunks.pop(recordId, None) is not None
+ return False
+
+
+def _buildKnowledge():
+ """Instantiate KnowledgeObjects without triggering the real DB bootstrap."""
+ ko = KnowledgeObjects.__new__(KnowledgeObjects)
+ ko.currentUser = None
+ ko.userId = None
+ ko._scopeCache = {}
+ ko.db = _FakeDb()
+ return ko
+
+
+def test_purge_by_connection_removes_only_matching_rows():
+ ko = _buildKnowledge()
+ ko.db.addIndex({"id": "sp1", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"})
+ ko.db.addIndex({"id": "sp2", "connectionId": "cx", "mandateId": "m1", "sourceKind": "sharepoint_item"})
+ ko.db.addIndex({"id": "upload", "connectionId": None, "mandateId": "m1", "sourceKind": "file"})
+ ko.db.addIndex({"id": "other", "connectionId": "cy", "mandateId": "m1", "sourceKind": "outlook_message"})
+ ko.db.addChunk({"id": "c1", "fileId": "sp1"})
+ ko.db.addChunk({"id": "c2", "fileId": "sp1"})
+ ko.db.addChunk({"id": "c3", "fileId": "sp2"})
+ ko.db.addChunk({"id": "c4", "fileId": "upload"})
+ ko.db.addChunk({"id": "c5", "fileId": "other"})
+
+ result = ko.deleteFileContentIndexByConnectionId("cx")
+
+ assert result == {"indexRows": 2, "chunks": 3}
+ assert "sp1" not in ko.db.indexRows
+ assert "sp2" not in ko.db.indexRows
+ assert "upload" in ko.db.indexRows
+ assert "other" in ko.db.indexRows
+ assert set(ko.db.chunks.keys()) == {"c4", "c5"}
+
+
+def test_purge_with_empty_connection_id_is_a_noop():
+ ko = _buildKnowledge()
+ ko.db.addIndex({"id": "sp1", "connectionId": "cx"})
+ ko.db.addChunk({"id": "c1", "fileId": "sp1"})
+
+ result = ko.deleteFileContentIndexByConnectionId("")
+
+ assert result == {"indexRows": 0, "chunks": 0}
+ assert "sp1" in ko.db.indexRows
+
+
+def test_purge_unknown_connection_returns_zero():
+ ko = _buildKnowledge()
+ ko.db.addIndex({"id": "sp1", "connectionId": "cx"})
+
+ result = ko.deleteFileContentIndexByConnectionId("nope")
+
+ assert result == {"indexRows": 0, "chunks": 0}
+ assert "sp1" in ko.db.indexRows
+
+
+if __name__ == "__main__":
+ test_purge_by_connection_removes_only_matching_rows()
+ test_purge_with_empty_connection_id_is_a_noop()
+ test_purge_unknown_connection_returns_zero()
+ print("OK — connection-purge tests passed")
diff --git a/tests/unit/services/test_knowledge_ingest_consumer.py b/tests/unit/services/test_knowledge_ingest_consumer.py
new file mode 100644
index 00000000..760e1ed6
--- /dev/null
+++ b/tests/unit/services/test_knowledge_ingest_consumer.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Unit tests for KnowledgeIngestionConsumer event dispatch.
+
+- `connection.established` → enqueue a `connection.bootstrap` job.
+- `connection.revoked` → synchronous purge via KnowledgeObjects.
+"""
+
+import asyncio
+import os
+import sys
+import types
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
+
+from modules.serviceCenter.services.serviceKnowledge import subConnectorIngestConsumer as consumer
+
+
+def _resetRegistration(monkeypatch):
+ """Force the module-level guard to register fresh in each test."""
+ monkeypatch.setattr(consumer, "_registered", False)
+
+
+def test_onConnectionEstablished_enqueues_bootstrap(monkeypatch):
+ startedJobs = []
+
+ async def _fakeStartJob(jobType, payload, **kwargs):
+ startedJobs.append({"jobType": jobType, "payload": payload, "kwargs": kwargs})
+ return "job-1"
+
+ monkeypatch.setattr(consumer, "startJob", _fakeStartJob)
+ consumer._onConnectionEstablished(
+ connectionId="c1", authority="msft", userId="u1"
+ )
+ # Drain pending tasks created by the consumer.
+ loop = asyncio.new_event_loop()
+ try:
+ asyncio.set_event_loop(loop)
+ # If the consumer created a Task on a closed loop the fake startJob
+ # was still called synchronously via asyncio.run — in either case we
+ # check the recorded call.
+ finally:
+ loop.close()
+
+ assert len(startedJobs) == 1
+ assert startedJobs[0]["jobType"] == consumer.BOOTSTRAP_JOB_TYPE
+ assert startedJobs[0]["payload"]["connectionId"] == "c1"
+ assert startedJobs[0]["payload"]["authority"] == "msft"
+ assert startedJobs[0]["kwargs"]["triggeredBy"] == "u1"
+
+
+def test_onConnectionEstablished_ignores_missing_id(monkeypatch):
+ called = []
+
+ async def _fakeStartJob(*a, **kw):
+ called.append(1)
+ return "x"
+
+ monkeypatch.setattr(consumer, "startJob", _fakeStartJob)
+ consumer._onConnectionEstablished(connectionId="", authority="msft")
+ assert called == []
+
+
+def test_onConnectionRevoked_runs_sync_purge(monkeypatch):
+ class _FakeKnowledge:
+ def __init__(self):
+ self.calls = []
+
+ def deleteFileContentIndexByConnectionId(self, cid):
+ self.calls.append(cid)
+ return {"indexRows": 2, "chunks": 5}
+
+ fakeKnow = _FakeKnowledge()
+
+ def _fakeGetInterface(_user=None):
+ return fakeKnow
+
+ monkeypatch.setattr(consumer, "getKnowledgeInterface", _fakeGetInterface)
+ consumer._onConnectionRevoked(
+ connectionId="c1", authority="msft", userId="u1", reason="disconnected"
+ )
+ assert fakeKnow.calls == ["c1"]
+
+
+def test_onConnectionRevoked_ignores_missing_id(monkeypatch):
+ seen = []
+
+ def _fakeGetInterface(_user=None):
+ class _K:
+ def deleteFileContentIndexByConnectionId(self, cid):
+ seen.append(cid)
+ return {"indexRows": 0, "chunks": 0}
+
+ return _K()
+
+ monkeypatch.setattr(consumer, "getKnowledgeInterface", _fakeGetInterface)
+ consumer._onConnectionRevoked(connectionId="")
+ assert seen == []
+
+
+def test_bootstrap_job_skips_non_pilot_authority(monkeypatch):
+ async def _run():
+ result = await consumer._bootstrapJobHandler(
+ {"payload": {"connectionId": "c1", "authority": "google"}},
+ lambda *_: None,
+ )
+ return result
+
+ result = asyncio.run(_run())
+ assert result["skipped"] is True
+ assert result["authority"] == "google"
+
+
+def test_bootstrap_job_dispatches_msft_parts(monkeypatch):
+ calls = {"sp": 0, "ol": 0}
+
+ async def _fakeSp(connectionId, progressCb=None):
+ calls["sp"] += 1
+ return {"indexed": 1}
+
+ async def _fakeOl(connectionId, progressCb=None):
+ calls["ol"] += 1
+ return {"indexed": 2}
+
+ # subConnectorSync* are lazy-imported inside the handler; install fake
+ # modules before invoking.
+ fakeSharepoint = types.ModuleType("subConnectorSyncSharepoint")
+ fakeSharepoint.bootstrapSharepoint = _fakeSp
+ fakeOutlook = types.ModuleType("subConnectorSyncOutlook")
+ fakeOutlook.bootstrapOutlook = _fakeOl
+ monkeypatch.setitem(
+ sys.modules,
+ "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint",
+ fakeSharepoint,
+ )
+ monkeypatch.setitem(
+ sys.modules,
+ "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncOutlook",
+ fakeOutlook,
+ )
+
+ async def _run():
+ return await consumer._bootstrapJobHandler(
+ {"payload": {"connectionId": "c1", "authority": "msft"}},
+ lambda *_: None,
+ )
+
+ result = asyncio.run(_run())
+ assert calls == {"sp": 1, "ol": 1}
+ assert result["sharepoint"] == {"indexed": 1}
+ assert result["outlook"] == {"indexed": 2}
+
+
+if __name__ == "__main__":
+ # Usable without pytest fixtures for a quick smoke run.
+ class _MP:
+ def __init__(self):
+ self.undos = []
+
+ def setattr(self, target, name_or_value, value=None):
+ if value is None:
+ # target is an object, name_or_value is value → no, original signature
+ raise SystemExit("use pytest monkeypatch in CLI")
+ self.undos.append((target, name_or_value, getattr(target, name_or_value)))
+ setattr(target, name_or_value, value)
+
+ def setitem(self, mapping, key, value):
+ self.undos.append((mapping, key, mapping.get(key)))
+ mapping[key] = value
+
+ print("Run via pytest: pytest tests/unit/services/test_knowledge_ingest_consumer.py")