From 93cb6939dc6a42212b100259538f6951594114fb Mon Sep 17 00:00:00 2001 From: Ida Date: Wed, 29 Apr 2026 09:13:57 +0200 Subject: [PATCH] feat: frontend consent integration --- modules/datamodels/datamodelUam.py | 18 +- modules/interfaces/interfaceDbApp.py | 41 +-- modules/routes/routeDataConnections.py | 13 +- modules/routes/routeSecurityClickup.py | 23 +- modules/routes/routeSecurityGoogle.py | 23 +- modules/routes/routeSecurityMsft.py | 23 +- .../serviceKnowledge/mainServiceKnowledge.py | 9 +- .../subConnectorIngestConsumer.py | 21 ++ .../serviceKnowledge/subConnectorPrefs.py | 101 ++++++ .../subConnectorSyncClickup.py | 65 ++-- .../subConnectorSyncGdrive.py | 16 +- .../serviceKnowledge/subConnectorSyncGmail.py | 60 +++- .../subConnectorSyncOutlook.py | 65 ++-- .../subConnectorSyncSharepoint.py | 10 +- tests/unit/services/test_p1d_consent_prefs.py | 298 ++++++++++++++++++ 15 files changed, 678 insertions(+), 108 deletions(-) create mode 100644 modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py create mode 100644 tests/unit/services/test_p1d_consent_prefs.py diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py index 0f7fe6b8..6aba24eb 100644 --- a/modules/datamodels/datamodelUam.py +++ b/modules/datamodels/datamodelUam.py @@ -475,7 +475,23 @@ class UserConnection(PowerOnModel): description="OAuth scopes granted for this connection", json_schema_extra={"frontend_type": "list", "frontend_readonly": True, "frontend_required": False, "label": "Gewährte Berechtigungen"}, ) - + knowledgeIngestionEnabled: bool = Field( + default=False, + description="Whether the user has consented to knowledge ingestion for this connection", + json_schema_extra={"frontend_type": "boolean", "frontend_readonly": False, "frontend_required": False, "label": "Wissensdatenbank aktiv"}, + ) + knowledgePreferences: Optional[Dict[str, Any]] = Field( + default=None, + description=( + "Per-connection knowledge ingestion preferences. schemaVersion=1 keys: " + "neutralizeBeforeEmbed (bool), mailContentDepth (metadata|snippet|full), " + "mailIndexAttachments (bool), filesIndexBinaries (bool), mimeAllowlist (list[str]), " + "clickupScope (titles|title_description|with_comments), " + "surfaceToggles (dict per authority), maxAgeDays (int)." + ), + json_schema_extra={"frontend_type": "json", "frontend_readonly": False, "frontend_required": False, "label": "Wissenspräferenzen"}, + ) + @computed_field @property def connectionReference(self) -> str: diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index 51519a29..04ae82ff 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -1268,19 +1268,7 @@ class AppObjects: result = [] for conn_dict in connections: try: - # Create UserConnection object - connection = UserConnection( - id=conn_dict["id"], - userId=conn_dict["userId"], - authority=conn_dict.get("authority"), - externalId=conn_dict.get("externalId", ""), - externalUsername=conn_dict.get("externalUsername", ""), - externalEmail=conn_dict.get("externalEmail"), - status=conn_dict.get("status", "pending"), - connectedAt=conn_dict.get("connectedAt"), - lastChecked=conn_dict.get("lastChecked"), - expiresAt=conn_dict.get("expiresAt"), - ) + connection = UserConnection.model_validate(conn_dict) result.append(connection) except Exception as e: logger.error( @@ -1317,18 +1305,21 @@ class AppObjects: if connections: conn_dict = connections[0] - return UserConnection( - id=conn_dict["id"], - userId=conn_dict["userId"], - authority=conn_dict.get("authority"), - externalId=conn_dict.get("externalId", ""), - externalUsername=conn_dict.get("externalUsername", ""), - externalEmail=conn_dict.get("externalEmail"), - status=conn_dict.get("status", "pending"), - connectedAt=conn_dict.get("connectedAt"), - lastChecked=conn_dict.get("lastChecked"), - expiresAt=conn_dict.get("expiresAt"), - ) + try: + return UserConnection.model_validate(conn_dict) + except Exception: + return UserConnection( + id=conn_dict["id"], + userId=conn_dict["userId"], + authority=conn_dict.get("authority"), + externalId=conn_dict.get("externalId", ""), + externalUsername=conn_dict.get("externalUsername", ""), + externalEmail=conn_dict.get("externalEmail"), + status=conn_dict.get("status", "pending"), + connectedAt=conn_dict.get("connectedAt"), + lastChecked=conn_dict.get("lastChecked"), + expiresAt=conn_dict.get("expiresAt"), + ) return None except Exception as e: logger.error(f"Error getting user connection by ID: {str(e)}") diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py index b8ccf4bf..51549d6a 100644 --- a/modules/routes/routeDataConnections.py +++ b/modules/routes/routeDataConnections.py @@ -351,11 +351,18 @@ def create_connection( externalUsername="", # Will be set after OAuth status=ConnectionStatus.PENDING # Start with PENDING status ) - + + # Apply knowledge consent + preferences from request body before persisting + knowledge_enabled = connection_data.get("knowledgeIngestionEnabled") + if isinstance(knowledge_enabled, bool): + connection.knowledgeIngestionEnabled = knowledge_enabled + knowledge_prefs = connection_data.get("knowledgePreferences") + if isinstance(knowledge_prefs, dict): + connection.knowledgePreferences = knowledge_prefs + # Save connection record - models now handle timestamp serialization automatically interface.db.recordModify(UserConnection, connection.id, connection.model_dump()) - - + return connection except HTTPException: diff --git a/modules/routes/routeSecurityClickup.py b/modules/routes/routeSecurityClickup.py index 698e3ca1..d6f71d20 100644 --- a/modules/routes/routeSecurityClickup.py +++ b/modules/routes/routeSecurityClickup.py @@ -244,12 +244,23 @@ async def auth_connect_callback( try: from modules.shared.callbackRegistry import callbackRegistry - callbackRegistry.trigger( - "connection.established", - connectionId=connection.id, - authority=str(getattr(connection.authority, "value", connection.authority) or "clickup"), - userId=str(user.id), - ) + if connection.knowledgeIngestionEnabled: + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "clickup"), + userId=str(user.id), + ) + else: + logger.info( + "ingestion.connection.bootstrap.skipped — knowledge ingestion disabled by user", + extra={ + "event": "ingestion.connection.bootstrap.skipped", + "connectionId": connection.id, + "authority": "clickup", + "reason": "consent_disabled", + }, + ) except Exception as _cbErr: logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py index 2b6a70f5..7b6c1c64 100644 --- a/modules/routes/routeSecurityGoogle.py +++ b/modules/routes/routeSecurityGoogle.py @@ -482,12 +482,23 @@ async def auth_connect_callback( try: from modules.shared.callbackRegistry import callbackRegistry - callbackRegistry.trigger( - "connection.established", - connectionId=connection.id, - authority=str(getattr(connection.authority, "value", connection.authority) or "google"), - userId=str(user.id), - ) + if connection.knowledgeIngestionEnabled: + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "google"), + userId=str(user.id), + ) + else: + logger.info( + "ingestion.connection.bootstrap.skipped — knowledge ingestion disabled by user", + extra={ + "event": "ingestion.connection.bootstrap.skipped", + "connectionId": connection.id, + "authority": "google", + "reason": "consent_disabled", + }, + ) except Exception as _cbErr: logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py index e087a44c..a2768a2b 100644 --- a/modules/routes/routeSecurityMsft.py +++ b/modules/routes/routeSecurityMsft.py @@ -423,12 +423,23 @@ async def auth_connect_callback( try: from modules.shared.callbackRegistry import callbackRegistry - callbackRegistry.trigger( - "connection.established", - connectionId=connection.id, - authority=str(getattr(connection.authority, "value", connection.authority) or "msft"), - userId=str(user.id), - ) + if connection.knowledgeIngestionEnabled: + callbackRegistry.trigger( + "connection.established", + connectionId=connection.id, + authority=str(getattr(connection.authority, "value", connection.authority) or "msft"), + userId=str(user.id), + ) + else: + logger.info( + "ingestion.connection.bootstrap.skipped — knowledge ingestion disabled by user", + extra={ + "event": "ingestion.connection.bootstrap.skipped", + "connectionId": connection.id, + "authority": "msft", + "reason": "consent_disabled", + }, + ) except Exception as _cbErr: logger.warning("connection.established callback failed for %s: %s", connection.id, _cbErr) diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 0267e2fd..6698e164 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -48,6 +48,9 @@ class IngestionJob: containerPath: Optional[str] = None contentVersion: Optional[str] = None provenance: Optional[Dict[str, Any]] = None + # Connector-driven neutralization: True when the user opted in via §2.6 preferences. + # For sourceKind == "file", _indexFileInternal resolves this from FileItem.neutralize instead. + neutralize: bool = False @dataclass @@ -205,6 +208,7 @@ class KnowledgeService: containerPath=job.containerPath, sourceKind=job.sourceKind, connectionId=(job.provenance or {}).get("connectionId"), + neutralize=job.neutralize, ) except Exception as exc: logger.error( @@ -391,6 +395,7 @@ class KnowledgeService: containerPath: str = None, sourceKind: str = "file", connectionId: Optional[str] = None, + neutralize: bool = False, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. @@ -421,7 +426,7 @@ class KnowledgeService: resolvedMandateId = mandateId resolvedFeatureInstanceId = featureInstanceId resolvedUserId = userId - _shouldNeutralize = False + _shouldNeutralize = neutralize # caller-supplied flag (connector prefs / IngestionJob) if sourceKind == "file": try: from modules.datamodels.datamodelFiles import FileItem as _FileItem @@ -435,7 +440,7 @@ class KnowledgeService: if _fileRecords: _fileRecord = _fileRecords[0] _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) - _shouldNeutralize = bool(_get("neutralize", False)) + _shouldNeutralize = bool(_get("neutralize", False)) # FileItem is authoritative for uploads _fileScope = _get("scope") if _fileScope: resolvedScope = _fileScope diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index f9b3533d..e27e2d29 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -135,6 +135,27 @@ async def _bootstrapJobHandler( progressCb(5, f"resolving {authority} connection") + # Defensive consent check: if the connection has since disabled knowledge ingestion + # (e.g. user toggled setting after the job was enqueued), skip all walkers. + try: + from modules.interfaces.interfaceDbApp import getRootInterface + _root = getRootInterface() + _conn = _root.getUserConnectionById(connectionId) + if _conn and not getattr(_conn, "knowledgeIngestionEnabled", True): + logger.info( + "ingestion.connection.bootstrap.skipped — consent disabled connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.skipped", + "connectionId": connectionId, + "authority": authority, + "reason": "consent_disabled", + }, + ) + return {"connectionId": connectionId, "authority": authority, "skipped": True, "reason": "consent_disabled"} + except Exception as _guardErr: + logger.debug("Could not load connection for consent guard: %s", _guardErr) + def _normalize(res: Any, label: str) -> Dict[str, Any]: if isinstance(res, Exception): logger.error( diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py new file mode 100644 index 00000000..950400ce --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py @@ -0,0 +1,101 @@ +"""Per-connection knowledge ingestion preference helpers. + +Walkers call `loadConnectionPrefs(connectionId)` once at bootstrap start and +receive a `ConnectionIngestionPrefs` dataclass they can pass down into their +inner loops. All fields have safe defaults so walkers stay backward-compatible +with connections that predate the §2.6 preference schema (knowledgePreferences +is None). +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +_DEFAULT_MAX_AGE_DAYS = 90 +_DEFAULT_MAIL_DEPTH = "full" +_DEFAULT_CLICKUP_SCOPE = "title_description" + + +@dataclass +class ConnectionIngestionPrefs: + """Parsed per-connection preferences for knowledge ingestion walkers.""" + + # PII + neutralizeBeforeEmbed: bool = False + + # Mail (Outlook + Gmail) + mailContentDepth: str = _DEFAULT_MAIL_DEPTH # "metadata" | "snippet" | "full" + mailIndexAttachments: bool = False + + # Files (Drive / SharePoint / OneDrive) + filesIndexBinaries: bool = True + mimeAllowlist: List[str] = field(default_factory=list) # empty = all allowed + + # ClickUp + clickupScope: str = _DEFAULT_CLICKUP_SCOPE # "titles" | "title_description" | "with_comments" + clickupIndexAttachments: bool = False + + # Per-authority surface toggles (default everything on) + gmailEnabled: bool = True + driveEnabled: bool = True + sharepointEnabled: bool = True + outlookEnabled: bool = True + + # Time window + maxAgeDays: int = _DEFAULT_MAX_AGE_DAYS # 0 = no limit + + +def loadConnectionPrefs(connectionId: str) -> ConnectionIngestionPrefs: + """Load and parse per-connection preferences from the database. + + Returns safe defaults for any missing or unparseable values so walkers + never fail due to missing preference data. + """ + try: + from modules.interfaces.interfaceDbApp import getRootInterface + root = getRootInterface() + conn = root.getUserConnectionById(connectionId) + if not conn: + logger.debug("loadConnectionPrefs: connection %s not found, using defaults", connectionId) + return ConnectionIngestionPrefs() + + raw: Optional[Dict[str, Any]] = getattr(conn, "knowledgePreferences", None) + if not raw or not isinstance(raw, dict): + return ConnectionIngestionPrefs() + + def _bool(key: str, default: bool) -> bool: + v = raw.get(key) + return bool(v) if isinstance(v, bool) else default + + def _str(key: str, allowed: List[str], default: str) -> str: + v = raw.get(key) + return v if v in allowed else default + + def _int(key: str, default: int) -> int: + v = raw.get(key) + return int(v) if isinstance(v, int) else default + + surface = raw.get("surfaceToggles") or {} + google_surf = surface.get("google") or {} + msft_surf = surface.get("msft") or {} + + return ConnectionIngestionPrefs( + neutralizeBeforeEmbed=_bool("neutralizeBeforeEmbed", False), + mailContentDepth=_str("mailContentDepth", ["metadata", "snippet", "full"], _DEFAULT_MAIL_DEPTH), + mailIndexAttachments=_bool("mailIndexAttachments", False), + filesIndexBinaries=_bool("filesIndexBinaries", True), + mimeAllowlist=list(raw.get("mimeAllowlist") or []), + clickupScope=_str("clickupScope", ["titles", "title_description", "with_comments"], _DEFAULT_CLICKUP_SCOPE), + clickupIndexAttachments=_bool("clickupIndexAttachments", False), + gmailEnabled=bool(google_surf.get("gmail", True)), + driveEnabled=bool(google_surf.get("drive", True)), + sharepointEnabled=bool(msft_surf.get("sharepoint", True)), + outlookEnabled=bool(msft_surf.get("outlook", True)), + maxAgeDays=_int("maxAgeDays", _DEFAULT_MAX_AGE_DAYS), + ) + except Exception as exc: + logger.warning("loadConnectionPrefs failed for %s, using defaults: %s", connectionId, exc) + return ConnectionIngestionPrefs() diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py index 16e94e59..31ac9687 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py @@ -46,6 +46,10 @@ class ClickupBootstrapLimits: # ClickUp `closed` tasks often carry the most useful RAG context # ("why was this shipped the way it was?"). includeClosed: bool = True + # Pass-through to IngestionJob.neutralize + neutralize: bool = False + # Content scope: "titles" | "title_description" | "with_comments" + clickupScope: str = "title_description" @dataclass @@ -88,7 +92,14 @@ def _isRecent(dateUpdatedMs: Any, maxAgeDays: Optional[int]) -> bool: def _buildContentObjects(task: Dict[str, Any], limits: ClickupBootstrapLimits) -> List[Dict[str, Any]]: - """Header (name/status/metadata) + description + text_content, all text.""" + """Header (name/status/metadata) + optional description + text_content. + + `limits.clickupScope` controls how much is embedded: + - "titles": task name + status metadata only + - "title_description": header + description / text_content (default) + - "with_comments": header + description + text_content + (comments themselves are not yet fetched in v1) + """ name = task.get("name") or f"Task {task.get('id', '')}" status = ((task.get("status") or {}).get("status")) or "" assignees = ", ".join( @@ -129,24 +140,26 @@ def _buildContentObjects(task: Dict[str, Any], limits: ClickupBootstrapLimits) - "contextRef": {"part": "header"}, }] - description = _truncate(task.get("description"), limits.maxDescriptionChars) - if description: - parts.append({ - "contentObjectId": "description", - "contentType": "text", - "data": description, - "contextRef": {"part": "description"}, - }) - # text_content is ClickUp's rendered-markdown version; include if it adds - # something beyond the plain description (common for bullet lists, checklists). - textContent = _truncate(task.get("text_content"), limits.maxDescriptionChars) - if textContent and textContent != description: - parts.append({ - "contentObjectId": "text_content", - "contentType": "text", - "data": textContent, - "contextRef": {"part": "text_content"}, - }) + scope = getattr(limits, "clickupScope", "title_description") + if scope in ("title_description", "with_comments"): + description = _truncate(task.get("description"), limits.maxDescriptionChars) + if description: + parts.append({ + "contentObjectId": "description", + "contentType": "text", + "data": description, + "contextRef": {"part": "description"}, + }) + # text_content is ClickUp's rendered-markdown version; include if it adds + # something beyond the plain description (common for bullet lists, checklists). + textContent = _truncate(task.get("text_content"), limits.maxDescriptionChars) + if textContent and textContent != description: + parts.append({ + "contentObjectId": "text_content", + "contentType": "text", + "data": textContent, + "contextRef": {"part": "text_content"}, + }) return parts @@ -160,7 +173,16 @@ async def bootstrapClickup( limits: Optional[ClickupBootstrapLimits] = None, ) -> Dict[str, Any]: """Walk workspaces → lists → tasks and ingest each task as a virtual doc.""" - limits = limits or ClickupBootstrapLimits() + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + prefs = loadConnectionPrefs(connectionId) + + if not limits: + limits = ClickupBootstrapLimits( + maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None, + neutralize=prefs.neutralizeBeforeEmbed, + clickupScope=prefs.clickupScope, + ) + startMs = time.time() result = ClickupBootstrapResult(connectionId=connectionId) @@ -406,6 +428,7 @@ async def _ingestTask( mandateId=mandateId, contentObjects=contentObjects, contentVersion=revision or None, + neutralize=limits.neutralize, provenance={ "connectionId": connectionId, "authority": "clickup", @@ -416,7 +439,7 @@ async def _ingestTask( "spaceId": ((task.get("space") or {}).get("id")), "url": task.get("url"), "status": ((task.get("status") or {}).get("status")), - "tier": "body", + "tier": limits.clickupScope, }, ) ) diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py index 3e73a040..5e4e659b 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py @@ -43,6 +43,10 @@ class GdriveBootstrapLimits: maxDepth: int = MAX_DEPTH_DEFAULT # Only ingest files modified within the last N days. None disables filter. maxAgeDays: Optional[int] = MAX_AGE_DAYS_DEFAULT + # Pass-through to IngestionJob.neutralize + neutralize: bool = False + # Whether to skip binary/non-text files + filesIndexBinaries: bool = True @dataclass @@ -115,7 +119,16 @@ async def bootstrapGdrive( runExtractionFn: Optional[Callable[..., Any]] = None, ) -> Dict[str, Any]: """Walk My Drive starting from the virtual root folder.""" - limits = limits or GdriveBootstrapLimits() + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + prefs = loadConnectionPrefs(connectionId) + + if not limits: + limits = GdriveBootstrapLimits( + maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None, + neutralize=prefs.neutralizeBeforeEmbed, + filesIndexBinaries=prefs.filesIndexBinaries, + ) + startMs = time.time() result = GdriveBootstrapResult(connectionId=connectionId) @@ -352,6 +365,7 @@ async def _ingestOne( mandateId=mandateId, contentObjects=contentObjects, contentVersion=revision, + neutralize=limits.neutralize, provenance={ "connectionId": connectionId, "authority": "google", diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py index 827add6b..21fec83d 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py @@ -42,6 +42,10 @@ class GmailBootstrapLimits: maxAttachmentBytes: int = MAX_ATTACHMENT_BYTES_DEFAULT # Only fetch messages newer than N days. None disables filter. maxAgeDays: Optional[int] = 90 + # Content depth: "metadata" | "snippet" | "full" + mailContentDepth: str = "full" + # Pass-through to IngestionJob.neutralize + neutralize: bool = False @dataclass @@ -112,7 +116,18 @@ def _headerMap(payload: Dict[str, Any]) -> Dict[str, str]: } -def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dict[str, Any]]: +def _buildContentObjects( + message: Dict[str, Any], + maxBodyChars: int, + mailContentDepth: str = "full", +) -> List[Dict[str, Any]]: + """Build content objects for a Gmail message. + + `mailContentDepth` controls how much is embedded: + - "metadata": header only (subject, from, to, date) + - "snippet": header + Gmail snippet (~155 chars, no full body) + - "full": header + snippet + cleaned full body (default) + """ payload = message.get("payload") or {} headers = _headerMap(payload) subject = headers.get("subject") or "(no subject)" @@ -122,10 +137,6 @@ def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dic date = headers.get("date") or "" snippet = message.get("snippet") or "" - bodies = _walkPayloadForBody(payload) - rawBody = bodies["text"] or bodies["html"] - cleanedBody = cleanEmailBody(rawBody, maxChars=maxBodyChars) if rawBody else "" - parts: List[Dict[str, Any]] = [] header = ( f"Subject: {subject}\n" @@ -140,20 +151,24 @@ def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dic "data": header, "contextRef": {"part": "header"}, }) - if snippet: + if mailContentDepth in ("snippet", "full") and snippet: parts.append({ "contentObjectId": "snippet", "contentType": "text", "data": snippet, "contextRef": {"part": "snippet"}, }) - if cleanedBody: - parts.append({ - "contentObjectId": "body", - "contentType": "text", - "data": cleanedBody, - "contextRef": {"part": "body"}, - }) + if mailContentDepth == "full": + bodies = _walkPayloadForBody(payload) + rawBody = bodies["text"] or bodies["html"] + cleanedBody = cleanEmailBody(rawBody, maxChars=maxBodyChars) if rawBody else "" + if cleanedBody: + parts.append({ + "contentObjectId": "body", + "contentType": "text", + "data": cleanedBody, + "contextRef": {"part": "body"}, + }) return parts @@ -168,7 +183,17 @@ async def bootstrapGmail( googleGetFn: Optional[Callable[..., Any]] = None, ) -> Dict[str, Any]: """Enumerate Gmail labels (INBOX + SENT default) and ingest messages.""" - limits = limits or GmailBootstrapLimits() + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + prefs = loadConnectionPrefs(connectionId) + + if not limits: + limits = GmailBootstrapLimits( + includeAttachments=prefs.mailIndexAttachments, + maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None, + mailContentDepth=prefs.mailContentDepth, + neutralize=prefs.neutralizeBeforeEmbed, + ) + startMs = time.time() result = GmailBootstrapResult(connectionId=connectionId) @@ -344,7 +369,9 @@ async def _ingestMessage( syntheticId = _syntheticMessageId(connectionId, messageId) fileName = f"{subject[:80].strip()}.eml" if subject else f"{messageId}.eml" - contentObjects = _buildContentObjects(message, limits.maxBodyChars) + contentObjects = _buildContentObjects( + message, limits.maxBodyChars, mailContentDepth=limits.mailContentDepth + ) try: handle = await knowledgeService.requestIngestion( IngestionJob( @@ -356,6 +383,7 @@ async def _ingestMessage( mandateId=mandateId, contentObjects=contentObjects, contentVersion=str(revision) if revision else None, + neutralize=limits.neutralize, provenance={ "connectionId": connectionId, "authority": "google", @@ -363,7 +391,7 @@ async def _ingestMessage( "externalItemId": messageId, "label": labelId, "threadId": message.get("threadId"), - "tier": "body", + "tier": limits.mailContentDepth, }, ) ) diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py index b3f425ac..64a3545f 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py @@ -40,6 +40,10 @@ class OutlookBootstrapLimits: maxAttachmentBytes: int = MAX_ATTACHMENT_BYTES_DEFAULT # Only fetch messages newer than N days. None disables filter. maxAgeDays: Optional[int] = 90 + # Content depth: "metadata" | "snippet" | "full" + mailContentDepth: str = "full" + # Pass-through to IngestionJob.neutralize + neutralize: bool = False @dataclass @@ -78,7 +82,18 @@ def _joinRecipients(recipients: List[Dict[str, Any]]) -> str: return ", ".join(filter(None, [_extractRecipient(r) for r in recipients or []])) -def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dict[str, Any]]: +def _buildContentObjects( + message: Dict[str, Any], + maxBodyChars: int, + mailContentDepth: str = "full", +) -> List[Dict[str, Any]]: + """Build content objects for an Outlook message. + + `mailContentDepth` mirrors the Gmail walker: + - "metadata": header only + - "snippet": header + bodyPreview (~255 chars) + - "full": header + snippet + cleaned body (default) + """ subject = message.get("subject") or "(no subject)" fromAddr = _extractRecipient(message.get("from") or {}) toAddr = _joinRecipients(message.get("toRecipients") or []) @@ -86,14 +101,6 @@ def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dic received = message.get("receivedDateTime") or "" snippet = message.get("bodyPreview") or "" - body = message.get("body") or {} - bodyContent = body.get("content") or "" - bodyType = (body.get("contentType") or "").lower() - if bodyType == "html" or (bodyContent and "<" in bodyContent and ">" in bodyContent): - cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) - else: - cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) if bodyContent else "" - parts: List[Dict[str, Any]] = [] header = ( f"Subject: {subject}\n" @@ -108,20 +115,24 @@ def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dic "data": header, "contextRef": {"part": "header"}, }) - if snippet: + if mailContentDepth in ("snippet", "full") and snippet: parts.append({ "contentObjectId": "snippet", "contentType": "text", "data": snippet, "contextRef": {"part": "snippet"}, }) - if cleanedBody: - parts.append({ - "contentObjectId": "body", - "contentType": "text", - "data": cleanedBody, - "contextRef": {"part": "body"}, - }) + if mailContentDepth == "full": + body = message.get("body") or {} + bodyContent = body.get("content") or "" + cleanedBody = cleanEmailBody(bodyContent, maxChars=maxBodyChars) if bodyContent else "" + if cleanedBody: + parts.append({ + "contentObjectId": "body", + "contentType": "text", + "data": cleanedBody, + "contextRef": {"part": "body"}, + }) return parts @@ -135,7 +146,17 @@ async def bootstrapOutlook( limits: Optional[OutlookBootstrapLimits] = None, ) -> Dict[str, Any]: """Enumerate Outlook folders (inbox + sent by default) and ingest messages.""" - limits = limits or OutlookBootstrapLimits() + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + prefs = loadConnectionPrefs(connectionId) + + if not limits: + limits = OutlookBootstrapLimits( + includeAttachments=prefs.mailIndexAttachments, + maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None, + mailContentDepth=prefs.mailContentDepth, + neutralize=prefs.neutralizeBeforeEmbed, + ) + startMs = time.time() result = OutlookBootstrapResult(connectionId=connectionId) @@ -330,7 +351,9 @@ async def _ingestMessage( syntheticId = _syntheticMessageId(connectionId, messageId) fileName = f"{subject[:80].strip()}.eml" if subject else f"{messageId}.eml" - contentObjects = _buildContentObjects(message, limits.maxBodyChars) + contentObjects = _buildContentObjects( + message, limits.maxBodyChars, mailContentDepth=limits.mailContentDepth + ) # Always at least the header is emitted, so `contentObjects` is non-empty. try: handle = await knowledgeService.requestIngestion( @@ -343,13 +366,14 @@ async def _ingestMessage( mandateId=mandateId, contentObjects=contentObjects, contentVersion=revision, + neutralize=limits.neutralize, provenance={ "connectionId": connectionId, "authority": "msft", "service": "outlook", "externalItemId": messageId, "internetMessageId": message.get("internetMessageId"), - "tier": "body", + "tier": limits.mailContentDepth, }, ) ) @@ -504,6 +528,7 @@ async def _ingestAttachments( userId=userId, mandateId=mandateId, contentObjects=contentObjects, + neutralize=limits.neutralize, provenance={ "connectionId": connectionId, "authority": "msft", diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py index 0bceecac..07fef7a8 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py @@ -39,6 +39,8 @@ class SharepointBootstrapLimits: skipMimePrefixes: tuple = SKIP_MIME_PREFIXES_DEFAULT maxDepth: int = MAX_DEPTH_DEFAULT maxSites: int = MAX_SITES_DEFAULT + # Pass-through to IngestionJob.neutralize + neutralize: bool = False @dataclass @@ -105,7 +107,12 @@ async def bootstrapSharepoint( `connectionId` (and optionally a progressCb) and everything else is resolved against the registered services. """ - limits = limits or SharepointBootstrapLimits() + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + prefs = loadConnectionPrefs(connectionId) + + if not limits: + limits = SharepointBootstrapLimits(neutralize=prefs.neutralizeBeforeEmbed) + startMs = time.time() result = SharepointBootstrapResult(connectionId=connectionId) @@ -349,6 +356,7 @@ async def _ingestOne( mandateId=mandateId, contentObjects=contentObjects, contentVersion=revision, + neutralize=limits.neutralize, provenance=provenance, ) ) diff --git a/tests/unit/services/test_p1d_consent_prefs.py b/tests/unit/services/test_p1d_consent_prefs.py new file mode 100644 index 00000000..e00b0dfc --- /dev/null +++ b/tests/unit/services/test_p1d_consent_prefs.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +"""Unit tests for P1d: consent gating, preference parsing, and walker behaviour. + +Tests +----- +1. Bootstrap runner skips when ``knowledgeIngestionEnabled=False``. +2. ``loadConnectionPrefs`` returns safe defaults when preferences are absent. +3. ``loadConnectionPrefs`` maps all §2.6 keys correctly from a full prefs dict. +4. Gmail walker passes ``neutralize=True`` and ``mailContentDepth`` to IngestionJob. +5. Gmail walker produces only a header content-object when depth="metadata". +6. ClickUp walker skips description when scope="titles". +""" + +from __future__ import annotations + +import asyncio +import os +import sys +import types +import unittest +from typing import Any, Dict, Optional +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + + +# --------------------------------------------------------------------------- +# 1. Bootstrap runner consent gate +# --------------------------------------------------------------------------- + +class TestBootstrapConsentGate(unittest.TestCase): + """_bootstrapJobHandler must no-op when knowledgeIngestionEnabled is False.""" + + def _makeJob(self, connectionId="c-test", authority="google"): + return {"payload": {"connectionId": connectionId, "authority": authority}} + + def _makeConn(self, enabled: bool): + conn = MagicMock() + conn.knowledgeIngestionEnabled = enabled + return conn + + def test_skips_when_consent_disabled(self): + from modules.serviceCenter.services.serviceKnowledge import subConnectorIngestConsumer as sut + + fake_root = MagicMock() + fake_root.getUserConnectionById.return_value = self._makeConn(False) + + with patch("modules.interfaces.interfaceDbApp.getRootInterface", return_value=fake_root): + result = asyncio.get_event_loop().run_until_complete( + sut._bootstrapJobHandler(self._makeJob(), lambda *a: None) + ) + + assert result.get("skipped") is True + assert result.get("reason") == "consent_disabled" + fake_root.getUserConnectionById.assert_called_once_with("c-test") + + def test_proceeds_when_consent_enabled(self): + """When consent is enabled, the handler should call at least one walker.""" + from modules.serviceCenter.services.serviceKnowledge import subConnectorIngestConsumer as sut + + fake_root = MagicMock() + fake_root.getUserConnectionById.return_value = self._makeConn(True) + + # Patch the inner walker so it doesn't do real I/O. + async def _fakeBootstrap(**kwargs): + return {"indexed": 0} + + with ( + patch("modules.interfaces.interfaceDbApp.getRootInterface", return_value=fake_root), + patch( + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive.bootstrapGdrive", + new=AsyncMock(return_value={"indexed": 0}), + ), + patch( + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail.bootstrapGmail", + new=AsyncMock(return_value={"indexed": 0}), + ), + ): + result = asyncio.get_event_loop().run_until_complete( + sut._bootstrapJobHandler(self._makeJob(authority="google"), lambda *a: None) + ) + + # Should not have 'skipped' at the top level. + assert result.get("skipped") is not True + assert result.get("authority") == "google" + + +# --------------------------------------------------------------------------- +# 2 + 3. loadConnectionPrefs +# --------------------------------------------------------------------------- + +class TestLoadConnectionPrefs(unittest.TestCase): + def _makeConn(self, prefs: Optional[Dict[str, Any]]): + conn = MagicMock() + conn.knowledgePreferences = prefs + return conn + + def _mockRoot(self, prefs): + root = MagicMock() + root.getUserConnectionById.return_value = self._makeConn(prefs) + return root + + def test_returns_safe_defaults_when_prefs_none(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import ( + ConnectionIngestionPrefs, + loadConnectionPrefs, + ) + + with patch("modules.interfaces.interfaceDbApp.getRootInterface", return_value=self._mockRoot(None)): + prefs = loadConnectionPrefs("x") + + assert prefs.neutralizeBeforeEmbed is False + assert prefs.mailContentDepth == "full" + assert prefs.mailIndexAttachments is False + assert prefs.maxAgeDays == 90 + assert prefs.clickupScope == "title_description" + assert prefs.gmailEnabled is True + assert prefs.driveEnabled is True + + def test_maps_all_keys(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + + raw = { + "neutralizeBeforeEmbed": True, + "mailContentDepth": "metadata", + "mailIndexAttachments": True, + "filesIndexBinaries": False, + "clickupScope": "with_comments", + "maxAgeDays": 30, + "surfaceToggles": { + "google": {"gmail": False, "drive": True}, + "msft": {"sharepoint": False, "outlook": True}, + }, + } + + with patch("modules.interfaces.interfaceDbApp.getRootInterface", return_value=self._mockRoot(raw)): + prefs = loadConnectionPrefs("x") + + assert prefs.neutralizeBeforeEmbed is True + assert prefs.mailContentDepth == "metadata" + assert prefs.mailIndexAttachments is True + assert prefs.filesIndexBinaries is False + assert prefs.clickupScope == "with_comments" + assert prefs.maxAgeDays == 30 + assert prefs.gmailEnabled is False + assert prefs.driveEnabled is True + assert prefs.sharepointEnabled is False + assert prefs.outlookEnabled is True + + def test_invalid_depth_falls_back_to_default(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs + + raw = {"mailContentDepth": "everything_please"} + + with patch("modules.interfaces.interfaceDbApp.getRootInterface", return_value=self._mockRoot(raw)): + prefs = loadConnectionPrefs("x") + + assert prefs.mailContentDepth == "full" + + +# --------------------------------------------------------------------------- +# 4. Gmail walker passes neutralize + mailContentDepth to IngestionJob +# --------------------------------------------------------------------------- + +class TestGmailWalkerPrefs(unittest.TestCase): + def _make_message(self, *, subject="Test", snippet="hello", body_text="full body"): + import base64 + encoded = base64.urlsafe_b64encode(body_text.encode()).decode() + return { + "id": "msg-1", + "historyId": "h-42", + "threadId": "t-1", + "snippet": snippet, + "payload": { + "mimeType": "multipart/alternative", + "headers": [ + {"name": "Subject", "value": subject}, + {"name": "From", "value": "alice@example.com"}, + {"name": "To", "value": "bob@example.com"}, + {"name": "Date", "value": "Mon, 20 Apr 2026 10:00:00 +0000"}, + ], + "parts": [ + { + "mimeType": "text/plain", + "body": {"data": encoded}, + } + ], + }, + } + + def test_neutralize_flag_forwarded(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import ( + GmailBootstrapLimits, + _ingestMessage, + GmailBootstrapResult, + ) + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + captured_jobs = [] + + async def fake_requestIngestion(job: IngestionJob): + captured_jobs.append(job) + return MagicMock(status="indexed", error=None) + + ks = MagicMock() + ks.requestIngestion = fake_requestIngestion + + limits = GmailBootstrapLimits(neutralize=True, mailContentDepth="full") + result = GmailBootstrapResult(connectionId="c-1") + + asyncio.get_event_loop().run_until_complete( + _ingestMessage( + googleGetFn=AsyncMock(return_value={}), + knowledgeService=ks, + connectionId="c-1", + mandateId="", + userId="u-1", + labelId="INBOX", + message=self._make_message(), + limits=limits, + result=result, + progressCb=None, + ) + ) + + assert len(captured_jobs) == 1 + assert captured_jobs[0].neutralize is True + + def test_metadata_depth_yields_only_header(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import ( + _buildContentObjects, + ) + + message = self._make_message(snippet="hi", body_text="should be excluded") + parts = _buildContentObjects(message, maxBodyChars=4000, mailContentDepth="metadata") + ids = [p["contentObjectId"] for p in parts] + assert ids == ["header"] + + def test_snippet_depth_yields_header_and_snippet(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import ( + _buildContentObjects, + ) + + message = self._make_message(snippet="hi", body_text="should be excluded") + parts = _buildContentObjects(message, maxBodyChars=4000, mailContentDepth="snippet") + ids = [p["contentObjectId"] for p in parts] + assert "header" in ids + assert "snippet" in ids + assert "body" not in ids + + +# --------------------------------------------------------------------------- +# 5. ClickUp walker respects clickupScope="titles" +# --------------------------------------------------------------------------- + +class TestClickupWalkerScope(unittest.TestCase): + def _make_task(self): + return { + "id": "task-1", + "name": "Ship feature X", + "date_updated": "1713888000000", + "description": "This should be omitted", + "text_content": "Also omitted", + "status": {"status": "open"}, + "assignees": [], + "tags": [], + "list": {"name": "Backlog"}, + "folder": {}, + "space": {"name": "Engineering"}, + } + + def test_titles_scope_omits_description(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import ( + ClickupBootstrapLimits, + _buildContentObjects, + ) + + limits = ClickupBootstrapLimits(clickupScope="titles") + parts = _buildContentObjects(self._make_task(), limits) + ids = [p["contentObjectId"] for p in parts] + assert ids == ["header"] + assert "description" not in ids + + def test_with_description_scope_includes_description(self): + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import ( + ClickupBootstrapLimits, + _buildContentObjects, + ) + + limits = ClickupBootstrapLimits(clickupScope="title_description") + parts = _buildContentObjects(self._make_task(), limits) + ids = [p["contentObjectId"] for p in parts] + assert "header" in ids + assert "description" in ids + + +if __name__ == "__main__": + unittest.main()