diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index 51acb71c..f9b3533d 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -135,6 +135,15 @@ async def _bootstrapJobHandler( progressCb(5, f"resolving {authority} connection") + def _normalize(res: Any, label: str) -> Dict[str, Any]: + if isinstance(res, Exception): + logger.error( + "ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s", + label, connectionId, res, exc_info=res, + ) + return {"error": str(res)} + return res or {} + if authority == "msft": from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import ( bootstrapSharepoint, @@ -149,16 +158,6 @@ async def _bootstrapJobHandler( bootstrapOutlook(connectionId=connectionId, progressCb=progressCb), return_exceptions=True, ) - - def _normalize(res: Any, label: str) -> Dict[str, Any]: - if isinstance(res, Exception): - logger.error( - "ingestion.connection.bootstrap.failed part=%s connectionId=%s error=%s", - label, connectionId, res, exc_info=res, - ) - return {"error": str(res)} - return res or {} - return { "connectionId": connectionId, "authority": authority, @@ -166,21 +165,55 @@ async def _bootstrapJobHandler( "outlook": _normalize(olResult, "outlook"), } + if authority == "google": + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import ( + bootstrapGdrive, + ) + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import ( + bootstrapGmail, + ) + + progressCb(10, "drive + gmail") + gdResult, gmResult = await asyncio.gather( + bootstrapGdrive(connectionId=connectionId, progressCb=progressCb), + bootstrapGmail(connectionId=connectionId, progressCb=progressCb), + return_exceptions=True, + ) + return { + "connectionId": connectionId, + "authority": authority, + "drive": _normalize(gdResult, "gdrive"), + "gmail": _normalize(gmResult, "gmail"), + } + + if authority == "clickup": + from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import ( + bootstrapClickup, + ) + + progressCb(10, "clickup tasks") + cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb) + return { + "connectionId": connectionId, + "authority": authority, + "clickup": _normalize(cuResult, "clickup"), + } + logger.info( - "ingestion.connection.bootstrap.skipped reason=P1_pilot_scope authority=%s connectionId=%s", + "ingestion.connection.bootstrap.skipped reason=unsupported_authority authority=%s connectionId=%s", authority, connectionId, extra={ "event": "ingestion.connection.bootstrap.skipped", "authority": authority, "connectionId": connectionId, - "reason": "P1_pilot_scope", + "reason": "unsupported_authority", }, ) return { "connectionId": connectionId, "authority": authority, "skipped": True, - "reason": "P1_pilot_scope", + "reason": "unsupported_authority", } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py new file mode 100644 index 00000000..16e94e59 --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py @@ -0,0 +1,489 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""ClickUp bootstrap for the unified knowledge ingestion lane. + +ClickUp tasks are ingested as *virtual documents* — we never download file +bytes. Each task becomes a `sourceKind="clickup_task"` IngestionJob whose +`contentObjects` carry a summary header (name + status + metadata) and the +task description / text content so retrieval finds them without a live API +call. + +Hierarchy traversal: workspace (team) → spaces → folders / folderless lists → +tasks. We cap the fan-out with `maxWorkspaces` / `maxListsPerWorkspace` / +`maxTasks` and skip tasks older than `maxAgeDays` (default 180 d). + +Idempotency: `date_updated` from the ClickUp task payload is a millisecond +timestamp and strictly monotonic per revision — used as `contentVersion`. +""" + +from __future__ import annotations + +import hashlib +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +MAX_TASKS_DEFAULT = 500 +MAX_WORKSPACES_DEFAULT = 3 +MAX_LISTS_PER_WORKSPACE_DEFAULT = 20 +MAX_DESCRIPTION_CHARS_DEFAULT = 8000 +MAX_AGE_DAYS_DEFAULT = 180 + + +@dataclass +class ClickupBootstrapLimits: + maxTasks: int = MAX_TASKS_DEFAULT + maxWorkspaces: int = MAX_WORKSPACES_DEFAULT + maxListsPerWorkspace: int = MAX_LISTS_PER_WORKSPACE_DEFAULT + maxDescriptionChars: int = MAX_DESCRIPTION_CHARS_DEFAULT + # Only ingest tasks updated within the last N days. None disables filter. + maxAgeDays: Optional[int] = MAX_AGE_DAYS_DEFAULT + # Include closed/archived tasks if they still meet the recency filter. + # ClickUp `closed` tasks often carry the most useful RAG context + # ("why was this shipped the way it was?"). + includeClosed: bool = True + + +@dataclass +class ClickupBootstrapResult: + connectionId: str + indexed: int = 0 + skippedDuplicate: int = 0 + skippedPolicy: int = 0 + failed: int = 0 + workspaces: int = 0 + lists: int = 0 + errors: List[str] = field(default_factory=list) + + +def _syntheticTaskId(connectionId: str, taskId: str) -> str: + token = hashlib.sha256(f"{connectionId}:{taskId}".encode("utf-8")).hexdigest()[:16] + return f"cu:{connectionId[:8]}:{token}" + + +def _truncate(value: Any, limit: int) -> str: + text = str(value or "").strip() + if not text: + return "" + if len(text) <= limit: + return text + return text[:limit].rstrip() + "\n[truncated]" + + +def _isRecent(dateUpdatedMs: Any, maxAgeDays: Optional[int]) -> bool: + if not maxAgeDays: + return True + if not dateUpdatedMs: + return True + try: + ts = datetime.fromtimestamp(int(dateUpdatedMs) / 1000.0, tz=timezone.utc) + except Exception: + return True + cutoff = datetime.now(timezone.utc) - timedelta(days=maxAgeDays) + return ts >= cutoff + + +def _buildContentObjects(task: Dict[str, Any], limits: ClickupBootstrapLimits) -> List[Dict[str, Any]]: + """Header (name/status/metadata) + description + text_content, all text.""" + name = task.get("name") or f"Task {task.get('id', '')}" + status = ((task.get("status") or {}).get("status")) or "" + assignees = ", ".join( + filter(None, [ + (a.get("username") or a.get("email") or "") + for a in (task.get("assignees") or []) + ]) + ) + tags = ", ".join(filter(None, [t.get("name", "") for t in (task.get("tags") or [])])) + listInfo = task.get("list") or {} + folderInfo = task.get("folder") or {} + spaceInfo = task.get("space") or {} + dueMs = task.get("due_date") + dueIso = "" + if dueMs: + try: + dueIso = datetime.fromtimestamp(int(dueMs) / 1000.0, tz=timezone.utc).strftime("%Y-%m-%d") + except Exception: + dueIso = "" + + headerLines = [ + f"Task: {name}", + f"Status: {status}" if status else "", + f"List: {listInfo.get('name', '')}" if listInfo else "", + f"Folder: {folderInfo.get('name', '')}" if folderInfo else "", + f"Space: {spaceInfo.get('name', '')}" if spaceInfo else "", + f"Assignees: {assignees}" if assignees else "", + f"Tags: {tags}" if tags else "", + f"Due: {dueIso}" if dueIso else "", + f"Url: {task.get('url', '')}" if task.get("url") else "", + ] + header = "\n".join(line for line in headerLines if line) + + parts: List[Dict[str, Any]] = [{ + "contentObjectId": "header", + "contentType": "text", + "data": header, + "contextRef": {"part": "header"}, + }] + + description = _truncate(task.get("description"), limits.maxDescriptionChars) + if description: + parts.append({ + "contentObjectId": "description", + "contentType": "text", + "data": description, + "contextRef": {"part": "description"}, + }) + # text_content is ClickUp's rendered-markdown version; include if it adds + # something beyond the plain description (common for bullet lists, checklists). + textContent = _truncate(task.get("text_content"), limits.maxDescriptionChars) + if textContent and textContent != description: + parts.append({ + "contentObjectId": "text_content", + "contentType": "text", + "data": textContent, + "contextRef": {"part": "text_content"}, + }) + return parts + + +async def bootstrapClickup( + connectionId: str, + *, + progressCb: Optional[Callable[[int, Optional[str]], None]] = None, + adapter: Any = None, + connection: Any = None, + knowledgeService: Any = None, + limits: Optional[ClickupBootstrapLimits] = None, +) -> Dict[str, Any]: + """Walk workspaces → lists → tasks and ingest each task as a virtual doc.""" + limits = limits or ClickupBootstrapLimits() + startMs = time.time() + result = ClickupBootstrapResult(connectionId=connectionId) + + logger.info( + "ingestion.connection.bootstrap.started part=clickup connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.started", + "part": "clickup", + "connectionId": connectionId, + }, + ) + + if adapter is None or knowledgeService is None or connection is None: + adapter, connection, knowledgeService = await _resolveDependencies(connectionId) + + mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else "" + userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" + + svc = getattr(adapter, "_svc", None) + if svc is None: + result.errors.append("adapter missing _svc instance") + return _finalizeResult(connectionId, result, startMs) + + try: + teamsResp = await svc.getAuthorizedTeams() + except Exception as exc: + logger.error("clickup team discovery failed for %s: %s", connectionId, exc, exc_info=True) + result.errors.append(f"teams: {exc}") + return _finalizeResult(connectionId, result, startMs) + + teams = (teamsResp or {}).get("teams") or [] + for team in teams[: limits.maxWorkspaces]: + if result.indexed + result.skippedDuplicate >= limits.maxTasks: + break + teamId = str(team.get("id", "") or "") + if not teamId: + continue + result.workspaces += 1 + try: + await _walkTeam( + svc=svc, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + team=team, + limits=limits, + result=result, + progressCb=progressCb, + ) + except Exception as exc: + logger.error("clickup team %s walk failed: %s", teamId, exc, exc_info=True) + result.errors.append(f"team({teamId}): {exc}") + + return _finalizeResult(connectionId, result, startMs) + + +async def _resolveDependencies(connectionId: str): + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.auth import TokenManager + from modules.connectors.providerClickup.connectorClickup import ClickupConnector + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.security.rootAccess import getRootUser + + rootInterface = getRootInterface() + connection = rootInterface.getUserConnectionById(connectionId) + if connection is None: + raise ValueError(f"UserConnection not found: {connectionId}") + + token = TokenManager().getFreshToken(connectionId) + if not token or not token.tokenAccess: + raise ValueError(f"No valid token for connection {connectionId}") + + provider = ClickupConnector(connection, token.tokenAccess) + adapter = provider.getServiceAdapter("clickup") + + rootUser = getRootUser() + ctx = ServiceCenterContext( + user=rootUser, + mandate_id=str(getattr(connection, "mandateId", "") or ""), + ) + knowledgeService = getService("knowledge", ctx) + return adapter, connection, knowledgeService + + +async def _walkTeam( + *, + svc, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + team: Dict[str, Any], + limits: ClickupBootstrapLimits, + result: ClickupBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + teamId = str(team.get("id", "") or "") + spacesResp = await svc.getSpaces(teamId) + spaces = (spacesResp or {}).get("spaces") or [] + + listsCollected: List[Dict[str, Any]] = [] + for space in spaces: + if len(listsCollected) >= limits.maxListsPerWorkspace: + break + spaceId = str(space.get("id", "") or "") + if not spaceId: + continue + + # Folderless lists directly under the space + folderless = await svc.getFolderlessLists(spaceId) + for lst in (folderless or {}).get("lists") or []: + if len(listsCollected) >= limits.maxListsPerWorkspace: + break + listsCollected.append({**lst, "_space": space}) + + # Lists inside folders + foldersResp = await svc.getFolders(spaceId) + for folder in (foldersResp or {}).get("folders") or []: + if len(listsCollected) >= limits.maxListsPerWorkspace: + break + folderId = str(folder.get("id", "") or "") + if not folderId: + continue + folderLists = await svc.getListsInFolder(folderId) + for lst in (folderLists or {}).get("lists") or []: + if len(listsCollected) >= limits.maxListsPerWorkspace: + break + listsCollected.append({**lst, "_space": space, "_folder": folder}) + + for lst in listsCollected: + if result.indexed + result.skippedDuplicate >= limits.maxTasks: + return + result.lists += 1 + await _walkList( + svc=svc, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + teamId=teamId, + lst=lst, + limits=limits, + result=result, + progressCb=progressCb, + ) + + +async def _walkList( + *, + svc, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + teamId: str, + lst: Dict[str, Any], + limits: ClickupBootstrapLimits, + result: ClickupBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + listId = str(lst.get("id", "") or "") + if not listId: + return + page = 0 + while result.indexed + result.skippedDuplicate < limits.maxTasks: + resp = await svc.getTasksInList( + listId, + page=page, + include_closed=limits.includeClosed, + subtasks=True, + ) + if isinstance(resp, dict) and resp.get("error"): + logger.warning("clickup tasks list=%s page=%d error: %s", listId, page, resp.get("error")) + result.errors.append(f"list({listId}): {resp.get('error')}") + return + tasks = (resp or {}).get("tasks") or [] + if not tasks: + return + + for task in tasks: + if result.indexed + result.skippedDuplicate >= limits.maxTasks: + return + if not _isRecent(task.get("date_updated"), limits.maxAgeDays): + result.skippedPolicy += 1 + continue + # Inject the list/folder/space metadata we already loaded. + task["list"] = task.get("list") or {"id": listId, "name": lst.get("name")} + task["folder"] = task.get("folder") or lst.get("_folder") or {} + task["space"] = task.get("space") or lst.get("_space") or {} + await _ingestTask( + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + teamId=teamId, + task=task, + limits=limits, + result=result, + progressCb=progressCb, + ) + + if len(tasks) < 100: # ClickUp page-size hint: fewer than 100 => last page + return + page += 1 + + +async def _ingestTask( + *, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + teamId: str, + task: Dict[str, Any], + limits: ClickupBootstrapLimits, + result: ClickupBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + taskId = str(task.get("id", "") or "") + if not taskId: + result.skippedPolicy += 1 + return + revision = str(task.get("date_updated") or task.get("date_created") or "") + name = task.get("name") or f"Task {taskId}" + syntheticId = _syntheticTaskId(connectionId, taskId) + fileName = f"{name[:80].strip() or taskId}.task.json" + + contentObjects = _buildContentObjects(task, limits) + + try: + handle = await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="clickup_task", + sourceId=syntheticId, + fileName=fileName, + mimeType="application/vnd.clickup.task+json", + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + contentVersion=revision or None, + provenance={ + "connectionId": connectionId, + "authority": "clickup", + "service": "clickup", + "externalItemId": taskId, + "teamId": teamId, + "listId": ((task.get("list") or {}).get("id")), + "spaceId": ((task.get("space") or {}).get("id")), + "url": task.get("url"), + "status": ((task.get("status") or {}).get("status")), + "tier": "body", + }, + ) + ) + except Exception as exc: + logger.error("clickup ingestion %s failed: %s", taskId, exc, exc_info=True) + result.failed += 1 + result.errors.append(f"ingest({taskId}): {exc}") + return + + if handle.status == "duplicate": + result.skippedDuplicate += 1 + elif handle.status == "indexed": + result.indexed += 1 + else: + result.failed += 1 + + if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0: + processed = result.indexed + result.skippedDuplicate + try: + progressCb( + min(90, 10 + int(80 * processed / max(1, limits.maxTasks))), + f"clickup processed={processed}", + ) + except Exception: + pass + logger.info( + "ingestion.connection.bootstrap.progress part=clickup processed=%d skippedDup=%d failed=%d", + processed, result.skippedDuplicate, result.failed, + extra={ + "event": "ingestion.connection.bootstrap.progress", + "part": "clickup", + "connectionId": connectionId, + "processed": processed, + "skippedDup": result.skippedDuplicate, + "failed": result.failed, + }, + ) + + +def _finalizeResult(connectionId: str, result: ClickupBootstrapResult, startMs: float) -> Dict[str, Any]: + durationMs = int((time.time() - startMs) * 1000) + logger.info( + "ingestion.connection.bootstrap.done part=clickup connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d workspaces=%d lists=%d durationMs=%d", + connectionId, + result.indexed, result.skippedDuplicate, result.skippedPolicy, + result.failed, result.workspaces, result.lists, durationMs, + extra={ + "event": "ingestion.connection.bootstrap.done", + "part": "clickup", + "connectionId": connectionId, + "indexed": result.indexed, + "skippedDup": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "workspaces": result.workspaces, + "lists": result.lists, + "durationMs": durationMs, + }, + ) + return { + "connectionId": result.connectionId, + "indexed": result.indexed, + "skippedDuplicate": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "workspaces": result.workspaces, + "lists": result.lists, + "durationMs": durationMs, + "errors": result.errors[:20], + } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py new file mode 100644 index 00000000..3e73a040 --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py @@ -0,0 +1,429 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Google Drive bootstrap for the unified knowledge ingestion lane. + +Mirrors the SharePoint pilot (see subConnectorSyncSharepoint.py). Walks the +user's *My Drive* tree from the virtual `root` folder, downloads each +file-like item via `DriveAdapter.download` (which handles native Google docs +via export), runs the standard extraction pipeline and routes results through +`KnowledgeService.requestIngestion` with `sourceKind="gdrive_item"` and +`contentVersion = modifiedTime` (monotonic per-revision). +""" + +from __future__ import annotations + +import hashlib +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, List, Optional + +from modules.datamodels.datamodelExtraction import ExtractionOptions + +logger = logging.getLogger(__name__) + +MAX_ITEMS_DEFAULT = 500 +MAX_BYTES_DEFAULT = 200 * 1024 * 1024 +MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024 +SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/") +MAX_DEPTH_DEFAULT = 4 +MAX_AGE_DAYS_DEFAULT = 365 + +# Google Drive uses virtual mime-types for folders and non-downloadable assets. +FOLDER_MIME = "application/vnd.google-apps.folder" + + +@dataclass +class GdriveBootstrapLimits: + maxItems: int = MAX_ITEMS_DEFAULT + maxBytes: int = MAX_BYTES_DEFAULT + maxFileSize: int = MAX_FILE_SIZE_DEFAULT + skipMimePrefixes: tuple = SKIP_MIME_PREFIXES_DEFAULT + maxDepth: int = MAX_DEPTH_DEFAULT + # Only ingest files modified within the last N days. None disables filter. + maxAgeDays: Optional[int] = MAX_AGE_DAYS_DEFAULT + + +@dataclass +class GdriveBootstrapResult: + connectionId: str + indexed: int = 0 + skippedDuplicate: int = 0 + skippedPolicy: int = 0 + failed: int = 0 + bytesProcessed: int = 0 + errors: List[str] = field(default_factory=list) + + +def _syntheticFileId(connectionId: str, externalItemId: str) -> str: + token = hashlib.sha256(f"{connectionId}:{externalItemId}".encode("utf-8")).hexdigest()[:16] + return f"gd:{connectionId[:8]}:{token}" + + +def _toContentObjects(extracted, fileName: str) -> List[Dict[str, Any]]: + parts = getattr(extracted, "parts", None) or [] + out: List[Dict[str, Any]] = [] + for part in parts: + data = getattr(part, "data", None) or "" + if not data or not str(data).strip(): + continue + typeGroup = getattr(part, "typeGroup", "text") or "text" + contentType = "text" + if typeGroup == "image": + contentType = "image" + elif typeGroup in ("binary", "container"): + contentType = "other" + out.append({ + "contentObjectId": getattr(part, "id", ""), + "contentType": contentType, + "data": data, + "contextRef": { + "containerPath": fileName, + "location": getattr(part, "label", None) or "file", + **(getattr(part, "metadata", None) or {}), + }, + }) + return out + + +def _isRecent(modifiedIso: Optional[str], maxAgeDays: Optional[int]) -> bool: + if not maxAgeDays: + return True + if not modifiedIso: + # No timestamp -> be permissive (Drive native docs sometimes omit it on export). + return True + try: + # Google returns RFC 3339 with `Z` or offset; python 3.11+ parses both. + ts = datetime.fromisoformat(modifiedIso.replace("Z", "+00:00")) + except Exception: + return True + cutoff = datetime.now(timezone.utc) - timedelta(days=maxAgeDays) + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + return ts >= cutoff + + +async def bootstrapGdrive( + connectionId: str, + *, + progressCb: Optional[Callable[[int, Optional[str]], None]] = None, + adapter: Any = None, + connection: Any = None, + knowledgeService: Any = None, + limits: Optional[GdriveBootstrapLimits] = None, + runExtractionFn: Optional[Callable[..., Any]] = None, +) -> Dict[str, Any]: + """Walk My Drive starting from the virtual root folder.""" + limits = limits or GdriveBootstrapLimits() + startMs = time.time() + result = GdriveBootstrapResult(connectionId=connectionId) + + logger.info( + "ingestion.connection.bootstrap.started part=gdrive connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.started", + "part": "gdrive", + "connectionId": connectionId, + }, + ) + + if adapter is None or knowledgeService is None or connection is None: + adapter, connection, knowledgeService = await _resolveDependencies(connectionId) + if runExtractionFn is None: + from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction + from modules.serviceCenter.services.serviceExtraction.subRegistry import ( + ExtractorRegistry, ChunkerRegistry, + ) + extractorRegistry = ExtractorRegistry() + chunkerRegistry = ChunkerRegistry() + + def runExtractionFn(bytesData, name, mime, options): # type: ignore[no-redef] + return runExtraction(extractorRegistry, chunkerRegistry, bytesData, name, mime, options) + + mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else "" + userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" + + try: + await _walkFolder( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + folderPath="/", # DriveAdapter.browse maps "" / "/" -> "root" + depth=0, + limits=limits, + result=result, + progressCb=progressCb, + ) + except Exception as exc: + logger.error("gdrive walk failed for %s: %s", connectionId, exc, exc_info=True) + result.errors.append(f"walk: {exc}") + + return _finalizeResult(connectionId, result, startMs) + + +async def _resolveDependencies(connectionId: str): + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.auth import TokenManager + from modules.connectors.providerGoogle.connectorGoogle import GoogleConnector + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.security.rootAccess import getRootUser + + rootInterface = getRootInterface() + connection = rootInterface.getUserConnectionById(connectionId) + if connection is None: + raise ValueError(f"UserConnection not found: {connectionId}") + + token = TokenManager().getFreshToken(connectionId) + if not token or not token.tokenAccess: + raise ValueError(f"No valid token for connection {connectionId}") + + provider = GoogleConnector(connection, token.tokenAccess) + adapter = provider.getServiceAdapter("drive") + + rootUser = getRootUser() + ctx = ServiceCenterContext( + user=rootUser, + mandate_id=str(getattr(connection, "mandateId", "") or ""), + ) + knowledgeService = getService("knowledge", ctx) + return adapter, connection, knowledgeService + + +async def _walkFolder( + *, + adapter, + knowledgeService, + runExtractionFn, + connectionId: str, + mandateId: str, + userId: str, + folderPath: str, + depth: int, + limits: GdriveBootstrapLimits, + result: GdriveBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + if depth > limits.maxDepth: + return + try: + entries = await adapter.browse(folderPath) + except Exception as exc: + logger.warning("gdrive browse %s failed: %s", folderPath, exc) + result.errors.append(f"browse({folderPath}): {exc}") + return + + for entry in entries: + if result.indexed + result.skippedDuplicate >= limits.maxItems: + return + if result.bytesProcessed >= limits.maxBytes: + return + + entryPath = getattr(entry, "path", "") or "" + metadata = getattr(entry, "metadata", {}) or {} + mimeType = getattr(entry, "mimeType", None) or metadata.get("mimeType") + + if getattr(entry, "isFolder", False) or mimeType == FOLDER_MIME: + await _walkFolder( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + folderPath=entryPath, + depth=depth + 1, + limits=limits, + result=result, + progressCb=progressCb, + ) + continue + + effectiveMime = mimeType or "application/octet-stream" + if any(effectiveMime.startswith(prefix) for prefix in limits.skipMimePrefixes): + result.skippedPolicy += 1 + continue + size = int(getattr(entry, "size", 0) or 0) + if size and size > limits.maxFileSize: + result.skippedPolicy += 1 + continue + modifiedTime = metadata.get("modifiedTime") + if not _isRecent(modifiedTime, limits.maxAgeDays): + result.skippedPolicy += 1 + continue + + externalItemId = metadata.get("id") or entryPath + revision = modifiedTime + + await _ingestOne( + adapter=adapter, + knowledgeService=knowledgeService, + runExtractionFn=runExtractionFn, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + entry=entry, + entryPath=entryPath, + mimeType=effectiveMime, + externalItemId=externalItemId, + revision=revision, + limits=limits, + result=result, + progressCb=progressCb, + ) + + +async def _ingestOne( + *, + adapter, + knowledgeService, + runExtractionFn, + connectionId: str, + mandateId: str, + userId: str, + entry, + entryPath: str, + mimeType: str, + externalItemId: str, + revision: Optional[str], + limits: GdriveBootstrapLimits, + result: GdriveBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + syntheticFileId = _syntheticFileId(connectionId, externalItemId) + fileName = getattr(entry, "name", "") or externalItemId + + try: + downloaded = await adapter.download(entryPath) + except Exception as exc: + logger.warning("gdrive download %s failed: %s", entryPath, exc) + result.failed += 1 + result.errors.append(f"download({entryPath}): {exc}") + return + + # Adapter.download returns raw bytes today; guard DownloadResult shape too. + fileBytes: bytes + if isinstance(downloaded, (bytes, bytearray)): + fileBytes = bytes(downloaded) + else: + fileBytes = bytes(getattr(downloaded, "data", b"") or b"") + if getattr(downloaded, "mimeType", None): + mimeType = downloaded.mimeType # export may have changed the type + if not fileBytes: + result.failed += 1 + return + if len(fileBytes) > limits.maxFileSize: + result.skippedPolicy += 1 + return + + result.bytesProcessed += len(fileBytes) + + try: + extracted = runExtractionFn( + fileBytes, fileName, mimeType, + ExtractionOptions(mergeStrategy=None), + ) + except Exception as exc: + logger.warning("gdrive extraction %s failed: %s", entryPath, exc) + result.failed += 1 + result.errors.append(f"extract({entryPath}): {exc}") + return + + contentObjects = _toContentObjects(extracted, fileName) + if not contentObjects: + result.skippedPolicy += 1 + return + + try: + handle = await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="gdrive_item", + sourceId=syntheticFileId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + contentVersion=revision, + provenance={ + "connectionId": connectionId, + "authority": "google", + "service": "drive", + "externalItemId": externalItemId, + "entryPath": entryPath, + "tier": "body", + }, + ) + ) + except Exception as exc: + logger.error("gdrive ingestion %s failed: %s", entryPath, exc, exc_info=True) + result.failed += 1 + result.errors.append(f"ingest({entryPath}): {exc}") + return + + if handle.status == "duplicate": + result.skippedDuplicate += 1 + elif handle.status == "indexed": + result.indexed += 1 + else: + result.failed += 1 + + if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0: + processed = result.indexed + result.skippedDuplicate + try: + progressCb( + min(90, 10 + int(80 * processed / max(1, limits.maxItems))), + f"gdrive processed={processed}", + ) + except Exception: + pass + logger.info( + "ingestion.connection.bootstrap.progress part=gdrive processed=%d skippedDup=%d failed=%d", + processed, result.skippedDuplicate, result.failed, + extra={ + "event": "ingestion.connection.bootstrap.progress", + "part": "gdrive", + "connectionId": connectionId, + "processed": processed, + "skippedDup": result.skippedDuplicate, + "failed": result.failed, + }, + ) + + +def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: float) -> Dict[str, Any]: + durationMs = int((time.time() - startMs) * 1000) + logger.info( + "ingestion.connection.bootstrap.done part=gdrive connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d bytes=%d durationMs=%d", + connectionId, + result.indexed, result.skippedDuplicate, result.skippedPolicy, + result.failed, result.bytesProcessed, durationMs, + extra={ + "event": "ingestion.connection.bootstrap.done", + "part": "gdrive", + "connectionId": connectionId, + "indexed": result.indexed, + "skippedDup": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "bytes": result.bytesProcessed, + "durationMs": durationMs, + }, + ) + return { + "connectionId": result.connectionId, + "indexed": result.indexed, + "skippedDuplicate": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "failed": result.failed, + "bytesProcessed": result.bytesProcessed, + "durationMs": durationMs, + "errors": result.errors[:20], + } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py new file mode 100644 index 00000000..827add6b --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py @@ -0,0 +1,578 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Gmail bootstrap for the unified knowledge ingestion lane. + +Mirrors the Outlook pilot (see subConnectorSyncOutlook.py) but talks to Google +Mail's REST API. Messages become `sourceKind="gmail_message"` virtual documents +with header / snippet / cleaned body content-objects; attachments are optional +child jobs with `sourceKind="gmail_attachment"`. + +Idempotency: Gmail's stable `historyId` (or `internalDate` as fallback) is +passed as `contentVersion`, so rerunning the bootstrap yields +`ingestion.skipped.duplicate` for unchanged messages. +""" + +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, List, Optional + +from modules.serviceCenter.services.serviceKnowledge.subTextClean import cleanEmailBody + +logger = logging.getLogger(__name__) + +MAX_MESSAGES_DEFAULT = 500 +MAX_BODY_CHARS_DEFAULT = 8000 +MAX_ATTACHMENT_BYTES_DEFAULT = 10 * 1024 * 1024 +DEFAULT_LABELS = ("INBOX", "SENT") + + +@dataclass +class GmailBootstrapLimits: + maxMessages: int = MAX_MESSAGES_DEFAULT + labels: tuple = DEFAULT_LABELS + maxBodyChars: int = MAX_BODY_CHARS_DEFAULT + includeAttachments: bool = False + maxAttachmentBytes: int = MAX_ATTACHMENT_BYTES_DEFAULT + # Only fetch messages newer than N days. None disables filter. + maxAgeDays: Optional[int] = 90 + + +@dataclass +class GmailBootstrapResult: + connectionId: str + indexed: int = 0 + skippedDuplicate: int = 0 + skippedPolicy: int = 0 + failed: int = 0 + attachmentsIndexed: int = 0 + errors: List[str] = field(default_factory=list) + + +def _syntheticMessageId(connectionId: str, messageId: str) -> str: + token = hashlib.sha256(f"{connectionId}:{messageId}".encode("utf-8")).hexdigest()[:16] + return f"gm:{connectionId[:8]}:{token}" + + +def _syntheticAttachmentId(connectionId: str, messageId: str, attachmentId: str) -> str: + token = hashlib.sha256( + f"{connectionId}:{messageId}:{attachmentId}".encode("utf-8") + ).hexdigest()[:16] + return f"ga:{connectionId[:8]}:{token}" + + +def _decodeBase64Url(data: str) -> bytes: + if not data: + return b"" + # Gmail uses URL-safe base64 without padding. + padding = 4 - (len(data) % 4) + if padding != 4: + data = data + ("=" * padding) + try: + return base64.urlsafe_b64decode(data) + except Exception: + return b"" + + +def _walkPayloadForBody(payload: Dict[str, Any]) -> Dict[str, str]: + """Return {"text": ..., "html": ...} by walking MIME parts. + + Gmail `payload` is a tree of parts. We prefer `text/plain` for the cleaned + body, but capture `text/html` as a fallback so `cleanEmailBody` can strip + markup if plain is missing. + """ + found: Dict[str, str] = {"text": "", "html": ""} + + def _walk(part: Dict[str, Any]) -> None: + mime = (part.get("mimeType") or "").lower() + body = part.get("body") or {} + raw = body.get("data") or "" + if raw and mime.startswith("text/"): + decoded = _decodeBase64Url(raw).decode("utf-8", errors="replace") + key = "text" if mime == "text/plain" else ("html" if mime == "text/html" else "") + if key and not found[key]: + found[key] = decoded + for sub in part.get("parts") or []: + _walk(sub) + + _walk(payload or {}) + return found + + +def _headerMap(payload: Dict[str, Any]) -> Dict[str, str]: + return { + (h.get("name") or "").lower(): (h.get("value") or "") + for h in (payload.get("headers") or []) + } + + +def _buildContentObjects(message: Dict[str, Any], maxBodyChars: int) -> List[Dict[str, Any]]: + payload = message.get("payload") or {} + headers = _headerMap(payload) + subject = headers.get("subject") or "(no subject)" + fromAddr = headers.get("from") or "" + toAddr = headers.get("to") or "" + ccAddr = headers.get("cc") or "" + date = headers.get("date") or "" + snippet = message.get("snippet") or "" + + bodies = _walkPayloadForBody(payload) + rawBody = bodies["text"] or bodies["html"] + cleanedBody = cleanEmailBody(rawBody, maxChars=maxBodyChars) if rawBody else "" + + parts: List[Dict[str, Any]] = [] + header = ( + f"Subject: {subject}\n" + f"From: {fromAddr}\n" + f"To: {toAddr}\n" + + (f"Cc: {ccAddr}\n" if ccAddr else "") + + f"Date: {date}" + ) + parts.append({ + "contentObjectId": "header", + "contentType": "text", + "data": header, + "contextRef": {"part": "header"}, + }) + if snippet: + parts.append({ + "contentObjectId": "snippet", + "contentType": "text", + "data": snippet, + "contextRef": {"part": "snippet"}, + }) + if cleanedBody: + parts.append({ + "contentObjectId": "body", + "contentType": "text", + "data": cleanedBody, + "contextRef": {"part": "body"}, + }) + return parts + + +async def bootstrapGmail( + connectionId: str, + *, + progressCb: Optional[Callable[[int, Optional[str]], None]] = None, + adapter: Any = None, + connection: Any = None, + knowledgeService: Any = None, + limits: Optional[GmailBootstrapLimits] = None, + googleGetFn: Optional[Callable[..., Any]] = None, +) -> Dict[str, Any]: + """Enumerate Gmail labels (INBOX + SENT default) and ingest messages.""" + limits = limits or GmailBootstrapLimits() + startMs = time.time() + result = GmailBootstrapResult(connectionId=connectionId) + + logger.info( + "ingestion.connection.bootstrap.started part=gmail connectionId=%s", + connectionId, + extra={ + "event": "ingestion.connection.bootstrap.started", + "part": "gmail", + "connectionId": connectionId, + }, + ) + + if adapter is None or knowledgeService is None or connection is None: + adapter, connection, knowledgeService = await _resolveDependencies(connectionId) + + if googleGetFn is None: + from modules.connectors.providerGoogle.connectorGoogle import _googleGet as _defaultGet + + token = getattr(adapter, "_token", "") + + async def googleGetFn(url: str) -> Dict[str, Any]: # type: ignore[no-redef] + return await _defaultGet(token, url) + + mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else "" + userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" + + for labelId in limits.labels: + if result.indexed + result.skippedDuplicate >= limits.maxMessages: + break + try: + await _ingestLabel( + googleGetFn=googleGetFn, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + labelId=labelId, + limits=limits, + result=result, + progressCb=progressCb, + ) + except Exception as exc: + logger.error("gmail ingestion label %s failed: %s", labelId, exc, exc_info=True) + result.errors.append(f"label({labelId}): {exc}") + + return _finalizeResult(connectionId, result, startMs) + + +async def _resolveDependencies(connectionId: str): + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.auth import TokenManager + from modules.connectors.providerGoogle.connectorGoogle import GoogleConnector + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + from modules.security.rootAccess import getRootUser + + rootInterface = getRootInterface() + connection = rootInterface.getUserConnectionById(connectionId) + if connection is None: + raise ValueError(f"UserConnection not found: {connectionId}") + + token = TokenManager().getFreshToken(connectionId) + if not token or not token.tokenAccess: + raise ValueError(f"No valid token for connection {connectionId}") + + provider = GoogleConnector(connection, token.tokenAccess) + adapter = provider.getServiceAdapter("gmail") + + rootUser = getRootUser() + ctx = ServiceCenterContext( + user=rootUser, + mandate_id=str(getattr(connection, "mandateId", "") or ""), + ) + knowledgeService = getService("knowledge", ctx) + return adapter, connection, knowledgeService + + +async def _ingestLabel( + *, + googleGetFn, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + labelId: str, + limits: GmailBootstrapLimits, + result: GmailBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + remaining = limits.maxMessages - (result.indexed + result.skippedDuplicate) + if remaining <= 0: + return + + pageSize = min(100, remaining) + query = "" + if limits.maxAgeDays: + cutoff = datetime.now(timezone.utc) - timedelta(days=limits.maxAgeDays) + # Gmail uses YYYY/MM/DD. + query = f"after:{cutoff.strftime('%Y/%m/%d')}" + + baseUrl = ( + "https://gmail.googleapis.com/gmail/v1/users/me/messages" + f"?labelIds={labelId}&maxResults={pageSize}" + ) + if query: + baseUrl = f"{baseUrl}&q={query}" + + nextPageToken: Optional[str] = None + while (result.indexed + result.skippedDuplicate) < limits.maxMessages: + url = baseUrl if not nextPageToken else f"{baseUrl}&pageToken={nextPageToken}" + page = await googleGetFn(url) + if not isinstance(page, dict) or "error" in page: + err = (page or {}).get("error") if isinstance(page, dict) else "unknown" + logger.warning("gmail list page error for label %s: %s", labelId, err) + result.errors.append(f"list({labelId}): {err}") + return + + messageStubs = page.get("messages") or [] + for stub in messageStubs: + if result.indexed + result.skippedDuplicate >= limits.maxMessages: + break + msgId = stub.get("id") + if not msgId: + continue + detailUrl = ( + f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{msgId}?format=full" + ) + detail = await googleGetFn(detailUrl) + if not isinstance(detail, dict) or "error" in detail: + result.failed += 1 + continue + await _ingestMessage( + googleGetFn=googleGetFn, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + labelId=labelId, + message=detail, + limits=limits, + result=result, + progressCb=progressCb, + ) + + nextPageToken = page.get("nextPageToken") + if not nextPageToken: + break + + +async def _ingestMessage( + *, + googleGetFn, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + labelId: str, + message: Dict[str, Any], + limits: GmailBootstrapLimits, + result: GmailBootstrapResult, + progressCb: Optional[Callable[[int, Optional[str]], None]], +) -> None: + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + + messageId = message.get("id") + if not messageId: + result.skippedPolicy += 1 + return + revision = message.get("historyId") or message.get("internalDate") + headers = _headerMap(message.get("payload") or {}) + subject = headers.get("subject") or "(no subject)" + syntheticId = _syntheticMessageId(connectionId, messageId) + fileName = f"{subject[:80].strip()}.eml" if subject else f"{messageId}.eml" + + contentObjects = _buildContentObjects(message, limits.maxBodyChars) + try: + handle = await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="gmail_message", + sourceId=syntheticId, + fileName=fileName, + mimeType="message/rfc822", + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + contentVersion=str(revision) if revision else None, + provenance={ + "connectionId": connectionId, + "authority": "google", + "service": "gmail", + "externalItemId": messageId, + "label": labelId, + "threadId": message.get("threadId"), + "tier": "body", + }, + ) + ) + except Exception as exc: + logger.error("gmail ingestion %s failed: %s", messageId, exc, exc_info=True) + result.failed += 1 + result.errors.append(f"ingest({messageId}): {exc}") + return + + if handle.status == "duplicate": + result.skippedDuplicate += 1 + elif handle.status == "indexed": + result.indexed += 1 + else: + result.failed += 1 + + if limits.includeAttachments: + try: + await _ingestAttachments( + googleGetFn=googleGetFn, + knowledgeService=knowledgeService, + connectionId=connectionId, + mandateId=mandateId, + userId=userId, + message=message, + parentSyntheticId=syntheticId, + limits=limits, + result=result, + ) + except Exception as exc: + logger.warning("gmail attachments %s failed: %s", messageId, exc) + result.errors.append(f"attachments({messageId}): {exc}") + + if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0: + processed = result.indexed + result.skippedDuplicate + try: + progressCb( + min(90, 10 + int(80 * processed / max(1, limits.maxMessages))), + f"gmail processed={processed}", + ) + except Exception: + pass + logger.info( + "ingestion.connection.bootstrap.progress part=gmail processed=%d skippedDup=%d failed=%d", + processed, result.skippedDuplicate, result.failed, + extra={ + "event": "ingestion.connection.bootstrap.progress", + "part": "gmail", + "connectionId": connectionId, + "processed": processed, + "skippedDup": result.skippedDuplicate, + "failed": result.failed, + }, + ) + + await asyncio.sleep(0) + + +async def _ingestAttachments( + *, + googleGetFn, + knowledgeService, + connectionId: str, + mandateId: str, + userId: str, + message: Dict[str, Any], + parentSyntheticId: str, + limits: GmailBootstrapLimits, + result: GmailBootstrapResult, +) -> None: + """Child ingestion jobs for file attachments. Skips inline images (cid: refs).""" + from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob + from modules.datamodels.datamodelExtraction import ExtractionOptions + from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction + from modules.serviceCenter.services.serviceExtraction.subRegistry import ( + ExtractorRegistry, ChunkerRegistry, + ) + + messageId = message.get("id") or "" + + def _collectAttachmentStubs(part: Dict[str, Any], acc: List[Dict[str, Any]]) -> None: + filename = part.get("filename") or "" + body = part.get("body") or {} + attId = body.get("attachmentId") + if filename and attId: + acc.append({ + "filename": filename, + "mimeType": part.get("mimeType") or "application/octet-stream", + "attachmentId": attId, + "size": int(body.get("size") or 0), + }) + for sub in part.get("parts") or []: + _collectAttachmentStubs(sub, acc) + + stubs: List[Dict[str, Any]] = [] + _collectAttachmentStubs(message.get("payload") or {}, stubs) + if not stubs: + return + + extractorRegistry = ExtractorRegistry() + chunkerRegistry = ChunkerRegistry() + + for stub in stubs: + if stub["size"] and stub["size"] > limits.maxAttachmentBytes: + result.skippedPolicy += 1 + continue + attUrl = ( + f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{messageId}" + f"/attachments/{stub['attachmentId']}" + ) + detail = await googleGetFn(attUrl) + if not isinstance(detail, dict) or "error" in detail: + result.failed += 1 + continue + rawBytes = _decodeBase64Url(detail.get("data") or "") + if not rawBytes: + continue + fileName = stub["filename"] + mimeType = stub["mimeType"] + syntheticId = _syntheticAttachmentId(connectionId, messageId, stub["attachmentId"]) + + try: + extracted = runExtraction( + extractorRegistry, chunkerRegistry, + rawBytes, fileName, mimeType, + ExtractionOptions(mergeStrategy=None), + ) + except Exception as exc: + logger.warning("gmail attachment extract %s failed: %s", stub["attachmentId"], exc) + result.failed += 1 + continue + + contentObjects: List[Dict[str, Any]] = [] + for part in getattr(extracted, "parts", None) or []: + data = getattr(part, "data", None) or "" + if not data or not str(data).strip(): + continue + typeGroup = getattr(part, "typeGroup", "text") or "text" + contentType = "text" + if typeGroup == "image": + contentType = "image" + elif typeGroup in ("binary", "container"): + contentType = "other" + contentObjects.append({ + "contentObjectId": getattr(part, "id", ""), + "contentType": contentType, + "data": data, + "contextRef": { + "containerPath": fileName, + "location": getattr(part, "label", None) or "attachment", + **(getattr(part, "metadata", None) or {}), + }, + }) + if not contentObjects: + result.skippedPolicy += 1 + continue + + try: + await knowledgeService.requestIngestion( + IngestionJob( + sourceKind="gmail_attachment", + sourceId=syntheticId, + fileName=fileName, + mimeType=mimeType, + userId=userId, + mandateId=mandateId, + contentObjects=contentObjects, + provenance={ + "connectionId": connectionId, + "authority": "google", + "service": "gmail", + "parentId": parentSyntheticId, + "externalItemId": stub["attachmentId"], + "parentMessageId": messageId, + }, + ) + ) + result.attachmentsIndexed += 1 + except Exception as exc: + logger.warning("gmail attachment ingest %s failed: %s", stub["attachmentId"], exc) + result.failed += 1 + + +def _finalizeResult(connectionId: str, result: GmailBootstrapResult, startMs: float) -> Dict[str, Any]: + durationMs = int((time.time() - startMs) * 1000) + logger.info( + "ingestion.connection.bootstrap.done part=gmail connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d attachments=%d failed=%d durationMs=%d", + connectionId, + result.indexed, result.skippedDuplicate, result.skippedPolicy, + result.attachmentsIndexed, result.failed, durationMs, + extra={ + "event": "ingestion.connection.bootstrap.done", + "part": "gmail", + "connectionId": connectionId, + "indexed": result.indexed, + "skippedDup": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "attachmentsIndexed": result.attachmentsIndexed, + "failed": result.failed, + "durationMs": durationMs, + }, + ) + return { + "connectionId": result.connectionId, + "indexed": result.indexed, + "skippedDuplicate": result.skippedDuplicate, + "skippedPolicy": result.skippedPolicy, + "attachmentsIndexed": result.attachmentsIndexed, + "failed": result.failed, + "durationMs": durationMs, + "errors": result.errors[:20], + } diff --git a/tests/unit/services/test_bootstrap_clickup.py b/tests/unit/services/test_bootstrap_clickup.py new file mode 100644 index 00000000..87c08c3d --- /dev/null +++ b/tests/unit/services/test_bootstrap_clickup.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Bootstrap ClickUp tests with a fake service + knowledge service. + +Verifies: +- Teams → spaces → lists (folderless + folder-based) → tasks traversal. +- Each task produces a `requestIngestion` call with `sourceKind="clickup_task"` + and header + description content-objects. +- `date_updated` is forwarded as contentVersion → idempotency. +- Recency filter drops tasks older than `maxAgeDays`. +- maxWorkspaces / maxListsPerWorkspace / maxTasks caps are respected. +""" + +import asyncio +import os +import sys +import time +from types import SimpleNamespace + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup import ( + bootstrapClickup, + ClickupBootstrapLimits, + _syntheticTaskId, +) + + +def _nowMs(offsetDays: int = 0) -> str: + return str(int((time.time() + offsetDays * 86400) * 1000)) + + +class _FakeClickupService: + """Records API calls; serves a canned 1-team / 1-space / 1-list / 2-task layout.""" + + def __init__(self, taskCount=2, oldTask=False): + self._taskCount = taskCount + self._oldTask = oldTask # when True, the second task is 400 days old + self.calls = [] + + async def getAuthorizedTeams(self): + self.calls.append(("getAuthorizedTeams",)) + return {"teams": [{"id": "team-1", "name": "Acme"}]} + + async def getSpaces(self, team_id: str): + self.calls.append(("getSpaces", team_id)) + return {"spaces": [{"id": "space-1", "name": "Engineering"}]} + + async def getFolderlessLists(self, space_id: str): + self.calls.append(("getFolderlessLists", space_id)) + return {"lists": [{"id": "list-1", "name": "Sprint 1"}]} + + async def getFolders(self, space_id: str): + self.calls.append(("getFolders", space_id)) + return {"folders": [{"id": "folder-1", "name": "Subproject"}]} + + async def getListsInFolder(self, folder_id: str): + self.calls.append(("getListsInFolder", folder_id)) + return {"lists": [{"id": "list-2", "name": "Sub-tasks"}]} + + async def getTasksInList(self, list_id: str, *, page=0, include_closed=False, subtasks=True): + self.calls.append(("getTasksInList", list_id, page, include_closed)) + if page > 0: + return {"tasks": []} + tasks = [] + for i in range(self._taskCount): + tid = f"{list_id}-task-{i}" + offsetDays = -400 if (self._oldTask and i == 1) else 0 + tasks.append({ + "id": tid, + "name": f"Task {i} of {list_id}", + "description": f"Plain description for task {i}", + "text_content": f"Rich content for task {i}", + "status": {"status": "open" if i == 0 else "closed"}, + "assignees": [{"username": "alice"}], + "tags": [{"name": "urgent"}], + "date_updated": _nowMs(offsetDays), + "date_created": _nowMs(-1), + "url": f"https://app.clickup.com/t/{tid}", + }) + return {"tasks": tasks} + + +class _FakeKnowledgeService: + def __init__(self, duplicateIds=None): + self.calls = [] + self._duplicates = duplicateIds or set() + + async def requestIngestion(self, job): + self.calls.append(job) + status = "duplicate" if job.sourceId in self._duplicates else "indexed" + return SimpleNamespace( + jobId=job.sourceId, status=status, contentHash="h", + fileId=job.sourceId, index=None, error=None, + ) + + +def _adapter(svc): + return SimpleNamespace(_svc=svc) + + +def test_bootstrap_walks_team_space_lists_and_tasks(): + svc = _FakeClickupService(taskCount=2) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapClickup( + connectionId="c1", + adapter=_adapter(svc), + connection=connection, + knowledgeService=knowledge, + limits=ClickupBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + # 2 lists (folderless list-1 + folder's list-2) × 2 tasks each = 4 tasks + assert result["indexed"] == 4 + assert result["workspaces"] == 1 + assert result["lists"] == 2 + sourceIds = {c.sourceId for c in knowledge.calls} + assert len(sourceIds) == 4 + for job in knowledge.calls: + assert job.sourceKind == "clickup_task" + assert job.mimeType == "application/vnd.clickup.task+json" + assert job.mandateId == "m1" + assert job.provenance["connectionId"] == "c1" + assert job.provenance["authority"] == "clickup" + assert job.provenance["teamId"] == "team-1" + assert job.contentVersion # numeric millisecond string + # At least the header content-object is present. + ids = [co["contentObjectId"] for co in job.contentObjects] + assert "header" in ids + + +def test_bootstrap_reports_duplicates_on_second_run(): + svc = _FakeClickupService(taskCount=1) + duplicates = { + _syntheticTaskId("c1", "list-1-task-0"), + _syntheticTaskId("c1", "list-2-task-0"), + } + knowledge = _FakeKnowledgeService(duplicateIds=duplicates) + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapClickup( + connectionId="c1", + adapter=_adapter(svc), + connection=connection, + knowledgeService=knowledge, + limits=ClickupBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 0 + assert result["skippedDuplicate"] == 2 + + +def test_bootstrap_skips_tasks_older_than_maxAgeDays(): + svc = _FakeClickupService(taskCount=2, oldTask=True) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapClickup( + connectionId="c1", + adapter=_adapter(svc), + connection=connection, + knowledgeService=knowledge, + limits=ClickupBootstrapLimits(maxAgeDays=180), + ) + + result = asyncio.run(_run()) + # 2 lists × (1 recent + 1 skipped old) = 2 indexed + 2 skippedPolicy + assert result["indexed"] == 2 + assert result["skippedPolicy"] == 2 + + +def test_bootstrap_maxTasks_caps_ingestion(): + svc = _FakeClickupService(taskCount=2) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapClickup( + connectionId="c1", + adapter=_adapter(svc), + connection=connection, + knowledgeService=knowledge, + limits=ClickupBootstrapLimits(maxAgeDays=None, maxTasks=3), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 3 + + +if __name__ == "__main__": + test_bootstrap_walks_team_space_lists_and_tasks() + test_bootstrap_reports_duplicates_on_second_run() + test_bootstrap_skips_tasks_older_than_maxAgeDays() + test_bootstrap_maxTasks_caps_ingestion() + print("OK — bootstrapClickup tests passed") diff --git a/tests/unit/services/test_bootstrap_gdrive.py b/tests/unit/services/test_bootstrap_gdrive.py new file mode 100644 index 00000000..1b88677e --- /dev/null +++ b/tests/unit/services/test_bootstrap_gdrive.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Bootstrap Google Drive tests with a fake adapter + knowledge service. + +Verifies: +- Drive walk traverses root → subfolders, respecting `maxDepth`. +- Every file triggers `requestIngestion` with `sourceKind="gdrive_item"`. +- Duplicate runs (same modifiedTime revision) report `skippedDuplicate`. +- Provenance carries `authority="google"` and the Drive file id. +- Recency filter skips files older than `maxAgeDays`. +""" + +import asyncio +import os +import sys +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from typing import Any, Dict, List, Optional + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive import ( + bootstrapGdrive, + GdriveBootstrapLimits, + _syntheticFileId, +) + + +@dataclass +class _ExtEntry: + name: str + path: str + isFolder: bool = False + size: Optional[int] = None + mimeType: Optional[str] = None + metadata: Dict[str, Any] = None + + +def _today_iso(offsetDays: int = 0) -> str: + return (datetime.now(timezone.utc) + timedelta(days=offsetDays)).strftime("%Y-%m-%dT%H:%M:%SZ") + + +class _FakeDriveAdapter: + """Minimal DriveAdapter stand-in. + + Layout: + "/" (root) → 2 files + 1 folder (sub) + "/sub_id" → 1 file + """ + + def __init__(self, recent_only: bool = True): + self.downloaded: List[str] = [] + self._recent = _today_iso(0) + self._old = _today_iso(-400) + self._recent_only = recent_only + + async def browse(self, path: str, filter=None, limit=None): + if path in ("/", "", "root"): + return [ + _ExtEntry( + name="f1.txt", path="/f1", size=20, + mimeType="text/plain", + metadata={"id": "f1", "modifiedTime": self._recent}, + ), + _ExtEntry( + name="f2.txt", path="/f2", size=20, + mimeType="text/plain", + metadata={"id": "f2", "modifiedTime": self._recent if self._recent_only else self._old}, + ), + _ExtEntry( + name="Subfolder", path="/sub_id", isFolder=True, + mimeType="application/vnd.google-apps.folder", + metadata={"id": "sub_id", "modifiedTime": self._recent}, + ), + ] + if path == "/sub_id": + return [ + _ExtEntry( + name="f3.txt", path="/f3", size=20, + mimeType="text/plain", + metadata={"id": "f3", "modifiedTime": self._recent}, + ), + ] + return [] + + async def download(self, path: str) -> bytes: + self.downloaded.append(path) + return path.encode("utf-8") + + +class _FakeKnowledgeService: + def __init__(self, duplicateIds=None): + self.calls: List[SimpleNamespace] = [] + self._duplicateIds = duplicateIds or set() + + async def requestIngestion(self, job): + self.calls.append(job) + status = "duplicate" if job.sourceId in self._duplicateIds else "indexed" + return SimpleNamespace( + jobId=f"{job.sourceKind}:{job.sourceId}", + status=status, contentHash="h", + fileId=job.sourceId, index=None, error=None, + ) + + +def _fakeRunExtraction(data, name, mime, options): + return SimpleNamespace( + parts=[ + SimpleNamespace( + id="p1", + data=data.decode("utf-8") if isinstance(data, bytes) else str(data), + typeGroup="text", + label="page:1", + metadata={"pageIndex": 0}, + ) + ] + ) + + +def test_bootstrap_walks_drive_and_subfolders(): + adapter = _FakeDriveAdapter() + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGdrive( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + limits=GdriveBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert len(knowledge.calls) == 3 + sourceIds = {c.sourceId for c in knowledge.calls} + assert sourceIds == { + _syntheticFileId("c1", "f1"), + _syntheticFileId("c1", "f2"), + _syntheticFileId("c1", "f3"), + } + assert result["indexed"] == 3 + assert result["skippedDuplicate"] == 0 + assert adapter.downloaded == ["/f1", "/f2", "/f3"] + + +def test_bootstrap_reports_duplicates_on_second_run(): + adapter = _FakeDriveAdapter() + duplicateIds = { + _syntheticFileId("c1", "f1"), + _syntheticFileId("c1", "f2"), + _syntheticFileId("c1", "f3"), + } + knowledge = _FakeKnowledgeService(duplicateIds=duplicateIds) + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGdrive( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + limits=GdriveBootstrapLimits(maxAgeDays=None), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 0 + assert result["skippedDuplicate"] == 3 + + +def test_bootstrap_skips_files_older_than_maxAgeDays(): + adapter = _FakeDriveAdapter(recent_only=False) # f2 is 400 days old + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGdrive( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + limits=GdriveBootstrapLimits(maxAgeDays=180), + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 2 # f1, f3 + assert result["skippedPolicy"] == 1 # f2 filtered out + + +def test_bootstrap_passes_connection_provenance(): + adapter = _FakeDriveAdapter() + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGdrive( + connectionId="c1", + adapter=adapter, + connection=connection, + knowledgeService=knowledge, + runExtractionFn=_fakeRunExtraction, + limits=GdriveBootstrapLimits(maxAgeDays=None), + ) + + asyncio.run(_run()) + for job in knowledge.calls: + assert job.sourceKind == "gdrive_item" + assert job.mandateId == "m1" + assert job.provenance["connectionId"] == "c1" + assert job.provenance["authority"] == "google" + assert job.provenance["service"] == "drive" + assert job.contentVersion # modifiedTime ISO string + + +if __name__ == "__main__": + test_bootstrap_walks_drive_and_subfolders() + test_bootstrap_reports_duplicates_on_second_run() + test_bootstrap_skips_files_older_than_maxAgeDays() + test_bootstrap_passes_connection_provenance() + print("OK — bootstrapGdrive tests passed") diff --git a/tests/unit/services/test_bootstrap_gmail.py b/tests/unit/services/test_bootstrap_gmail.py new file mode 100644 index 00000000..4f7cfe4d --- /dev/null +++ b/tests/unit/services/test_bootstrap_gmail.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Bootstrap Gmail tests with a fake googleGet + knowledge service. + +Verifies: +- Default labels (INBOX + SENT) are traversed. +- Each message produces a requestIngestion call with sourceKind=gmail_message + and structured contentObjects (header / snippet / body). +- Pagination via `nextPageToken` is followed. +- historyId is forwarded as contentVersion → idempotency. +- MIME body extraction walks nested parts (multipart/alternative). +""" + +import asyncio +import base64 +import os +import sys +from types import SimpleNamespace + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) + +from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail import ( + bootstrapGmail, + GmailBootstrapLimits, + _syntheticMessageId, + _buildContentObjects, + _walkPayloadForBody, +) + + +def _b64url(text: str) -> str: + return base64.urlsafe_b64encode(text.encode("utf-8")).decode("ascii").rstrip("=") + + +def _msg(mid: str, subject: str = "Hi", body: str = "Hello world", historyId: str = "h1"): + return { + "id": mid, + "threadId": f"thread-{mid}", + "historyId": historyId, + "internalDate": "1700000000000", + "snippet": body[:120], + "payload": { + "headers": [ + {"name": "Subject", "value": subject}, + {"name": "From", "value": "Alice "}, + {"name": "To", "value": "Bob "}, + {"name": "Date", "value": "Tue, 01 Jan 2025 10:00:00 +0000"}, + ], + "mimeType": "text/plain", + "body": {"data": _b64url(body), "size": len(body)}, + "parts": [], + }, + } + + +class _FakeGoogleGet: + """Records URLs + returns the wired-up page or message response.""" + + def __init__(self, messages_by_label, paginated_label=None, page2=None): + self._messages = messages_by_label + self._paginated = paginated_label + self._page2 = page2 or [] + self._served_first_page = set() + self.requested = [] + + async def __call__(self, url: str): + self.requested.append(url) + # List page: contains `/users/me/messages?labelIds=...` + if "/users/me/messages?" in url: + for label, msgs in self._messages.items(): + if f"labelIds={label}" in url: + if ( + label == self._paginated + and label not in self._served_first_page + ): + self._served_first_page.add(label) + return { + "messages": [{"id": m["id"]} for m in msgs], + "nextPageToken": "token-2", + } + if label == self._paginated and "pageToken=token-2" in url: + return { + "messages": [{"id": m["id"]} for m in self._page2], + } + return {"messages": [{"id": m["id"]} for m in msgs]} + return {"messages": []} + # Detail fetch: /users/me/messages/{id}?format=full + if "/users/me/messages/" in url and "format=full" in url: + msgId = url.split("/users/me/messages/")[-1].split("?")[0] + for msgs in self._messages.values(): + for m in msgs: + if m["id"] == msgId: + return m + for m in self._page2: + if m["id"] == msgId: + return m + return {"error": "not found"} + + +class _FakeKnowledgeService: + def __init__(self, duplicateIds=None): + self.calls = [] + self._duplicates = duplicateIds or set() + + async def requestIngestion(self, job): + self.calls.append(job) + status = "duplicate" if job.sourceId in self._duplicates else "indexed" + return SimpleNamespace( + jobId=job.sourceId, status=status, contentHash="h", + fileId=job.sourceId, index=None, error=None, + ) + + +def test_buildContentObjects_emits_header_snippet_body(): + parts = _buildContentObjects(_msg("m1", body="Hello\nWorld"), maxBodyChars=8000) + ids = [p["contentObjectId"] for p in parts] + assert ids == ["header", "snippet", "body"] + header = parts[0]["data"] + assert "Subject: Hi" in header + assert "From: Alice " in header + assert "To: Bob " in header + + +def test_walkPayloadForBody_prefers_plain_over_html(): + payload = { + "mimeType": "multipart/alternative", + "parts": [ + {"mimeType": "text/plain", "body": {"data": _b64url("plain body")}}, + {"mimeType": "text/html", "body": {"data": _b64url("

html body

")}}, + ], + } + bodies = _walkPayloadForBody(payload) + assert bodies["text"] == "plain body" + assert bodies["html"] == "

html body

" + + +def test_walkPayloadForBody_falls_back_to_html(): + payload = { + "mimeType": "multipart/alternative", + "parts": [ + {"mimeType": "text/html", "body": {"data": _b64url("

only html

")}}, + ], + } + bodies = _walkPayloadForBody(payload) + assert bodies["text"] == "" + assert "only html" in bodies["html"] + + +def test_bootstrap_gmail_indexes_messages_from_inbox_and_sent(): + fake_get = _FakeGoogleGet({ + "INBOX": [_msg("m1"), _msg("m2")], + "SENT": [_msg("m3")], + }) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGmail( + connectionId="c1", + adapter=SimpleNamespace(_token="t"), + connection=connection, + knowledgeService=knowledge, + limits=GmailBootstrapLimits(maxAgeDays=None), + googleGetFn=fake_get, + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 3 + sourceIds = {c.sourceId for c in knowledge.calls} + assert sourceIds == { + _syntheticMessageId("c1", "m1"), + _syntheticMessageId("c1", "m2"), + _syntheticMessageId("c1", "m3"), + } + for job in knowledge.calls: + assert job.sourceKind == "gmail_message" + assert job.mimeType == "message/rfc822" + assert job.provenance["connectionId"] == "c1" + assert job.provenance["authority"] == "google" + assert job.provenance["service"] == "gmail" + assert job.contentVersion == "h1" + assert any(co["contentObjectId"] == "header" for co in job.contentObjects) + + +def test_bootstrap_gmail_follows_pagination(): + fake_get = _FakeGoogleGet( + messages_by_label={"INBOX": [_msg("m1")], "SENT": []}, + paginated_label="INBOX", + page2=[_msg("m2"), _msg("m3")], + ) + knowledge = _FakeKnowledgeService() + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGmail( + connectionId="c1", + adapter=SimpleNamespace(_token="t"), + connection=connection, + knowledgeService=knowledge, + limits=GmailBootstrapLimits(maxAgeDays=None), + googleGetFn=fake_get, + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 3 + + +def test_bootstrap_gmail_reports_duplicates(): + fake_get = _FakeGoogleGet({"INBOX": [_msg("m1"), _msg("m2")], "SENT": []}) + duplicates = { + _syntheticMessageId("c1", "m1"), + _syntheticMessageId("c1", "m2"), + } + knowledge = _FakeKnowledgeService(duplicateIds=duplicates) + connection = SimpleNamespace(mandateId="m1", userId="u1") + + async def _run(): + return await bootstrapGmail( + connectionId="c1", + adapter=SimpleNamespace(_token="t"), + connection=connection, + knowledgeService=knowledge, + limits=GmailBootstrapLimits(maxAgeDays=None), + googleGetFn=fake_get, + ) + + result = asyncio.run(_run()) + assert result["indexed"] == 0 + assert result["skippedDuplicate"] == 2 + + +if __name__ == "__main__": + test_buildContentObjects_emits_header_snippet_body() + test_walkPayloadForBody_prefers_plain_over_html() + test_walkPayloadForBody_falls_back_to_html() + test_bootstrap_gmail_indexes_messages_from_inbox_and_sent() + test_bootstrap_gmail_follows_pagination() + test_bootstrap_gmail_reports_duplicates() + print("OK — bootstrapGmail tests passed") diff --git a/tests/unit/services/test_knowledge_ingest_consumer.py b/tests/unit/services/test_knowledge_ingest_consumer.py index 760e1ed6..6b27a6e8 100644 --- a/tests/unit/services/test_knowledge_ingest_consumer.py +++ b/tests/unit/services/test_knowledge_ingest_consumer.py @@ -99,17 +99,18 @@ def test_onConnectionRevoked_ignores_missing_id(monkeypatch): assert seen == [] -def test_bootstrap_job_skips_non_pilot_authority(monkeypatch): +def test_bootstrap_job_skips_unsupported_authority(monkeypatch): async def _run(): result = await consumer._bootstrapJobHandler( - {"payload": {"connectionId": "c1", "authority": "google"}}, + {"payload": {"connectionId": "c1", "authority": "slack"}}, lambda *_: None, ) return result result = asyncio.run(_run()) assert result["skipped"] is True - assert result["authority"] == "google" + assert result["authority"] == "slack" + assert result["reason"] == "unsupported_authority" def test_bootstrap_job_dispatches_msft_parts(monkeypatch): @@ -123,8 +124,6 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch): calls["ol"] += 1 return {"indexed": 2} - # subConnectorSync* are lazy-imported inside the handler; install fake - # modules before invoking. fakeSharepoint = types.ModuleType("subConnectorSyncSharepoint") fakeSharepoint.bootstrapSharepoint = _fakeSp fakeOutlook = types.ModuleType("subConnectorSyncOutlook") @@ -152,6 +151,70 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch): assert result["outlook"] == {"indexed": 2} +def test_bootstrap_job_dispatches_google_parts(monkeypatch): + calls = {"gd": 0, "gm": 0} + + async def _fakeGd(connectionId, progressCb=None): + calls["gd"] += 1 + return {"indexed": 7} + + async def _fakeGm(connectionId, progressCb=None): + calls["gm"] += 1 + return {"indexed": 11} + + fakeGdrive = types.ModuleType("subConnectorSyncGdrive") + fakeGdrive.bootstrapGdrive = _fakeGd + fakeGmail = types.ModuleType("subConnectorSyncGmail") + fakeGmail.bootstrapGmail = _fakeGm + monkeypatch.setitem( + sys.modules, + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGdrive", + fakeGdrive, + ) + monkeypatch.setitem( + sys.modules, + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncGmail", + fakeGmail, + ) + + async def _run(): + return await consumer._bootstrapJobHandler( + {"payload": {"connectionId": "c1", "authority": "google"}}, + lambda *_: None, + ) + + result = asyncio.run(_run()) + assert calls == {"gd": 1, "gm": 1} + assert result["drive"] == {"indexed": 7} + assert result["gmail"] == {"indexed": 11} + + +def test_bootstrap_job_dispatches_clickup_part(monkeypatch): + calls = {"cu": 0} + + async def _fakeCu(connectionId, progressCb=None): + calls["cu"] += 1 + return {"indexed": 4} + + fakeClickup = types.ModuleType("subConnectorSyncClickup") + fakeClickup.bootstrapClickup = _fakeCu + monkeypatch.setitem( + sys.modules, + "modules.serviceCenter.services.serviceKnowledge.subConnectorSyncClickup", + fakeClickup, + ) + + async def _run(): + return await consumer._bootstrapJobHandler( + {"payload": {"connectionId": "c1", "authority": "clickup"}}, + lambda *_: None, + ) + + result = asyncio.run(_run()) + assert calls == {"cu": 1} + assert result["clickup"] == {"indexed": 4} + + if __name__ == "__main__": # Usable without pytest fixtures for a quick smoke run. class _MP: