From 48c0f900af160716ffdecd9e3293b15ac289835a Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 12 May 2026 15:19:01 +0200
Subject: [PATCH] rag
---
app.py | 3 +
env-gateway-dev.env | 7 -
env-gateway-int.env | 7 -
env-gateway-prod-forgejo.env | 11 +-
env-gateway-prod.env | 7 -
modules/datamodels/datamodelDataSource.py | 12 +-
modules/datamodels/datamodelUam.py | 8 +-
modules/features/workspace/mainWorkspace.py | 7 -
.../workspace/routeFeatureWorkspace.py | 45 ---
modules/interfaces/interfaceDbKnowledge.py | 54 ++++
modules/routes/routeBilling.py | 25 +-
modules/routes/routeDataConnections.py | 178 +++++++++++-
modules/routes/routeDataFiles.py | 2 +-
modules/routes/routeDataPrompts.py | 2 +-
modules/routes/routeDataSources.py | 74 ++++-
modules/routes/routeHelpers.py | 113 ++++++--
modules/routes/routeRagInventory.py | 267 ++++++++++++++++++
.../serviceBackgroundJobs/__init__.py | 6 +
.../mainBackgroundJobService.py | 120 +++++++-
.../subConnectorIngestConsumer.py | 79 +++++-
.../serviceKnowledge/subConnectorPrefs.py | 26 +-
.../subConnectorSyncClickup.py | 126 ++++++---
.../subConnectorSyncGdrive.py | 131 +++++----
.../serviceKnowledge/subConnectorSyncGmail.py | 97 +++++--
.../subConnectorSyncOutlook.py | 104 ++++---
.../subConnectorSyncSharepoint.py | 76 +++--
.../serviceKnowledge/subPolicyResolver.py | 78 +++++
modules/system/mainSystem.py | 8 +
scripts/script_db_migrate_datasource_rag.py | 88 ++++++
tests/unit/services/test_bootstrap_clickup.py | 41 ++-
tests/unit/services/test_bootstrap_gdrive.py | 19 +-
tests/unit/services/test_bootstrap_outlook.py | 4 +
32 files changed, 1450 insertions(+), 375 deletions(-)
create mode 100644 modules/routes/routeRagInventory.py
create mode 100644 modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
create mode 100644 scripts/script_db_migrate_datasource_rag.py
diff --git a/app.py b/app.py
index f5adb3d7..73a64064 100644
--- a/app.py
+++ b/app.py
@@ -604,6 +604,9 @@ app.include_router(promptRouter)
from modules.routes.routeDataConnections import router as connectionsRouter
app.include_router(connectionsRouter)
+from modules.routes.routeRagInventory import router as ragInventoryRouter
+app.include_router(ragInventoryRouter)
+
from modules.routes.routeTableViews import router as tableViewsRouter
app.include_router(tableViewsRouter)
diff --git a/env-gateway-dev.env b/env-gateway-dev.env
index 7802b33d..f4e7e244 100644
--- a/env-gateway-dev.env
+++ b/env-gateway-dev.env
@@ -87,13 +87,6 @@ APP_DEBUG_CHAT_WORKFLOW_DIR = D:/Athi/Local/Web/poweron/local/debug
APP_DEBUG_ACCOUNTING_SYNC_ENABLED = True
APP_DEBUG_ACCOUNTING_SYNC_DIR = D:/Athi/Local/Web/poweron/local/debug/sync
-# Manadate Pre-Processing Servers
-PREPROCESS_ALTHAUS_CHAT_SECRET = DEV_ENC:Z0FBQUFBQnBudkpGbEphQ3ZUMlFMQ2EwSGpoSE9NNzRJNTJtaGk1N0RGakdIYnVVeVFHZmF5OXB3QTVWLVNaZk9wNkhfQkZWRnVwRGRxem9iRzJIWXdpX1NIN2FwSExfT3c9PQ==
-
-# Preprocessor API Configuration
-PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
-PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
-
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss
diff --git a/env-gateway-int.env b/env-gateway-int.env
index a1924fff..0898a985 100644
--- a/env-gateway-int.env
+++ b/env-gateway-int.env
@@ -87,13 +87,6 @@ APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
APP_DEBUG_ACCOUNTING_SYNC_ENABLED = FALSE
APP_DEBUG_ACCOUNTING_SYNC_DIR = ./debug/sync
-# Manadate Pre-Processing Servers
-PREPROCESS_ALTHAUS_CHAT_SECRET = INT_ENC:Z0FBQUFBQnBaSnM4UkNBelhvckxCQUVjZm94N3BZUDcxaEMyckE2dm1lRVhqODhrWU1SUjNXZ3dQZlVJOWhveXFkZXpobW5xT0NneGZ2SkNUblFmYXd0WTBYNTl3UmRnSWc9PQ==
-
-# Preprocessor API Configuration
-PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
-PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
-
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss
diff --git a/env-gateway-prod-forgejo.env b/env-gateway-prod-forgejo.env
index b9c9e686..80a175e6 100644
--- a/env-gateway-prod-forgejo.env
+++ b/env-gateway-prod-forgejo.env
@@ -41,10 +41,10 @@ Service_MSFT_DATA_REDIRECT_URI = https://api.poweron.swiss/api/msft/auth/connect
Service_GOOGLE_AUTH_CLIENT_ID = 813678306829-3f23dnf1cs4aaftubjfickt46tlmkgjm.apps.googleusercontent.com
Service_GOOGLE_AUTH_CLIENT_SECRET = PROD_ENC:Z0FBQUFBQnFBa1kybjVVZ0FldUE1NTJiY2U1N0I0aVU0Z2hfeWlYc2tTdmlxTS1NdGxsRnFHdjZVcW5RRHZkUFhzUTVyX2RaZHlrQThRdTdCRmVBelBOcDlsbFQyd19SZExuWEM5aTcwQ0FvY3ctMUlWU1pndDE0MkdzeTZZRHkwLWU3aW56LW1jS20=
-Service_GOOGLE_AUTH_REDIRECT_URI =
+Service_GOOGLE_AUTH_REDIRECT_URI = https://api.poweron.swiss/api/google/auth/login/callback
Service_GOOGLE_DATA_CLIENT_ID = 813678306829-3f23dnf1cs4aaftubjfickt46tlmkgjm.apps.googleusercontent.com
Service_GOOGLE_DATA_CLIENT_SECRET = PROD_ENC:Z0FBQUFBQnFBa1kyMnFma3VPOVJtTFFrNDRLN0NkWHY2dUZDWlJzdDVMd3p3N19IY0tWdURRRzExOGZCMjJOYmpKT1E0cTVwYlgtcVJINTY0anZPc1VoTW00cHl6NVh3ZHVTek1oT1RqWUhtamRkZ1dENWlwNTlZSU1oNWczeGdEOC1Gbk5XU2RBcmI=
-Service_GOOGLE_DATA_REDIRECT_URI =
+Service_GOOGLE_DATA_REDIRECT_URI = https://api.poweron.swiss/api/google/auth/connect/callback
# ClickUp OAuth (Verbindungen / automation). Create an app in ClickUp: Settings → Apps → API; set redirect URL to Service_CLICKUP_OAUTH_REDIRECT_URI exactly.
Service_CLICKUP_CLIENT_ID = O3FX3H602A30MQN4I4SBNGJLIDBD5SL4
@@ -86,13 +86,6 @@ APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
APP_DEBUG_ACCOUNTING_SYNC_ENABLED = FALSE
APP_DEBUG_ACCOUNTING_SYNC_DIR = ./debug/sync
-# Manadate Pre-Processing Servers
-PREPROCESS_ALTHAUS_CHAT_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4RVRmYW5IelNIbklTUDZIMEoycEN4ZFF0YUJoWWlUTUh2M0dhSXpYRXcwVkRGd1VieDNsYkdCRlpxMUR5Rjk1RDhPRkE5bmVtc2VDMURfLW9QNkxMVHN0M1JhbU9sa3JHWmdDZnlHS3BQRVBGTERVMHhXOVdDOWVqNkhfSUQyOHo=
-
-# Preprocessor API Configuration
-PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
-PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
-
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss
diff --git a/env-gateway-prod.env b/env-gateway-prod.env
index d42bb0f9..6f4cfab0 100644
--- a/env-gateway-prod.env
+++ b/env-gateway-prod.env
@@ -87,13 +87,6 @@ APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
APP_DEBUG_ACCOUNTING_SYNC_ENABLED = FALSE
APP_DEBUG_ACCOUNTING_SYNC_DIR = ./debug/sync
-# Manadate Pre-Processing Servers
-PREPROCESS_ALTHAUS_CHAT_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4RVRmYW5IelNIbklTUDZIMEoycEN4ZFF0YUJoWWlUTUh2M0dhSXpYRXcwVkRGd1VieDNsYkdCRlpxMUR5Rjk1RDhPRkE5bmVtc2VDMURfLW9QNkxMVHN0M1JhbU9sa3JHWmdDZnlHS3BQRVBGTERVMHhXOVdDOWVqNkhfSUQyOHo=
-
-# Preprocessor API Configuration
-PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
-PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
-
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss
diff --git a/modules/datamodels/datamodelDataSource.py b/modules/datamodels/datamodelDataSource.py
index d9e40bde..fe3f0442 100644
--- a/modules/datamodels/datamodelDataSource.py
+++ b/modules/datamodels/datamodelDataSource.py
@@ -62,15 +62,15 @@ class DataSource(PowerOnModel):
description="Owner user ID",
json_schema_extra={"label": "Benutzer-ID", "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"}},
)
- autoSync: bool = Field(
+ ragIndexEnabled: bool = Field(
default=False,
- description="Automatically sync on schedule",
- json_schema_extra={"label": "Auto-Sync"},
+ description="When true this tree element is indexed into the RAG knowledge store",
+ json_schema_extra={"label": "Im RAG indexieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False},
)
- lastSynced: Optional[float] = Field(
+ lastIndexed: Optional[float] = Field(
default=None,
- description="Last sync timestamp",
- json_schema_extra={"label": "Letzter Sync", "frontend_type": "timestamp"},
+ description="Timestamp of last successful RAG indexing run",
+ json_schema_extra={"label": "Letzte Indexierung", "frontend_type": "timestamp"},
)
scope: str = Field(
default="personal",
diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py
index 6aba24eb..f6cbd8fa 100644
--- a/modules/datamodels/datamodelUam.py
+++ b/modules/datamodels/datamodelUam.py
@@ -484,10 +484,10 @@ class UserConnection(PowerOnModel):
default=None,
description=(
"Per-connection knowledge ingestion preferences. schemaVersion=1 keys: "
- "neutralizeBeforeEmbed (bool), mailContentDepth (metadata|snippet|full), "
- "mailIndexAttachments (bool), filesIndexBinaries (bool), mimeAllowlist (list[str]), "
- "clickupScope (titles|title_description|with_comments), "
- "surfaceToggles (dict per authority), maxAgeDays (int)."
+ "mailContentDepth (metadata|snippet|full), mailIndexAttachments (bool), "
+ "filesIndexBinaries (bool), clickupScope (titles|title_description|with_comments), "
+ "clickupIndexAttachments (bool), maxAgeDays (int). "
+ "Neutralization is controlled per DataSource.neutralize (not here)."
),
json_schema_extra={"frontend_type": "json", "frontend_readonly": False, "frontend_required": False, "label": "Wissenspräferenzen"},
)
diff --git a/modules/features/workspace/mainWorkspace.py b/modules/features/workspace/mainWorkspace.py
index 24307b45..77f5b290 100644
--- a/modules/features/workspace/mainWorkspace.py
+++ b/modules/features/workspace/mainWorkspace.py
@@ -33,11 +33,6 @@ UI_OBJECTS = [
"label": t("Einstellungen", context="UI"),
"meta": {"area": "settings"}
},
- {
- "objectKey": "ui.feature.workspace.rag-insights",
- "label": t("Wissens-Insights", context="UI"),
- "meta": {"area": "rag-insights"},
- },
]
RESOURCE_OBJECTS = [
@@ -86,7 +81,6 @@ TEMPLATE_ROLES = [
{"context": "UI", "item": "ui.feature.workspace.dashboard", "view": True},
{"context": "UI", "item": "ui.feature.workspace.editor", "view": True},
{"context": "UI", "item": "ui.feature.workspace.settings", "view": True},
- {"context": "UI", "item": "ui.feature.workspace.rag-insights", "view": True},
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
]
},
@@ -97,7 +91,6 @@ TEMPLATE_ROLES = [
{"context": "UI", "item": "ui.feature.workspace.dashboard", "view": True},
{"context": "UI", "item": "ui.feature.workspace.editor", "view": True},
{"context": "UI", "item": "ui.feature.workspace.settings", "view": True},
- {"context": "UI", "item": "ui.feature.workspace.rag-insights", "view": True},
{"context": "RESOURCE", "item": "resource.feature.workspace.start", "view": True},
{"context": "RESOURCE", "item": "resource.feature.workspace.stop", "view": True},
{"context": "RESOURCE", "item": "resource.feature.workspace.files", "view": True},
diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py
index 9595fee4..4487e5fe 100644
--- a/modules/features/workspace/routeFeatureWorkspace.py
+++ b/modules/features/workspace/routeFeatureWorkspace.py
@@ -2192,49 +2192,4 @@ async def putWorkspaceUserSettings(
# =========================================================================
# RAG / Knowledge — anonymised instance statistics (presentation / KPIs)
-# =========================================================================
-def _collectWorkspaceFileIdsForStats(instanceId: str, mandateId: Optional[str]) -> List[str]:
- """All FileItem ids for this feature instance (any user). Knowledge rows are often stored
- without featureInstanceId; we correlate by file id from the Management DB."""
- from modules.datamodels.datamodelFiles import FileItem
- from modules.interfaces.interfaceDbManagement import ComponentObjects
-
- co = ComponentObjects()
- rows = co.db.getRecordset(FileItem, recordFilter={"featureInstanceId": instanceId})
- out: List[str] = []
- m = str(mandateId) if mandateId else ""
- for r in rows or []:
- rid = r.get("id") if isinstance(r, dict) else getattr(r, "id", None)
- if not rid:
- continue
- if m:
- mid = r.get("mandateId") if isinstance(r, dict) else getattr(r, "mandateId", "") or ""
- if mid and mid != m:
- continue
- out.append(str(rid))
- return out
-
-
-@router.get("/{instanceId}/rag-statistics")
-@limiter.limit("60/minute")
-async def getRagStatistics(
- request: Request,
- instanceId: str = Path(...),
- days: int = Query(90, ge=7, le=365, description="Timeline window in days"),
- context: RequestContext = Depends(getRequestContext),
-):
- """Aggregated, non-identifying knowledge-store metrics for this workspace instance."""
- mandateId, _instanceConfig = _validateInstanceAccess(instanceId, context)
- workspaceFileIds = _collectWorkspaceFileIdsForStats(instanceId, mandateId)
- kdb = getKnowledgeInterface(context.user)
- stats = kdb.getRagStatisticsForInstance(
- featureInstanceId=instanceId,
- mandateId=str(mandateId) if mandateId else "",
- timelineDays=days,
- workspaceFileIds=workspaceFileIds,
- )
- if isinstance(stats, dict):
- stats.setdefault("scope", {})
- stats["scope"]["workspaceFileIdsResolved"] = len(workspaceFileIds)
- return JSONResponse(stats)
diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py
index c2f79b67..31a5af61 100644
--- a/modules/interfaces/interfaceDbKnowledge.py
+++ b/modules/interfaces/interfaceDbKnowledge.py
@@ -133,6 +133,60 @@ class KnowledgeObjects:
return {"indexRows": indexCount, "chunks": chunkCount}
+ def deleteFileContentIndexByDataSource(self, dataSourceId: str) -> Dict[str, int]:
+ """Delete all FileContentIndex rows whose provenance.dataSourceId matches.
+
+ Used when a user disables ragIndexEnabled on a DataSource to purge
+ only those chunks that were ingested from that specific tree element.
+ """
+ if not dataSourceId:
+ return {"indexRows": 0, "chunks": 0}
+
+ allRows = self.db.getRecordset(FileContentIndex)
+ matchedRows = []
+ for row in allRows:
+ prov = row.get("provenance") if isinstance(row, dict) else getattr(row, "provenance", None)
+ if isinstance(prov, dict) and prov.get("dataSourceId") == dataSourceId:
+ matchedRows.append(row)
+
+ mandateIds: set = set()
+ chunkCount = 0
+ indexCount = 0
+ for row in matchedRows:
+ fid = row.get("id") if isinstance(row, dict) else getattr(row, "id", None)
+ mid = row.get("mandateId") if isinstance(row, dict) else getattr(row, "mandateId", "")
+ if not fid:
+ continue
+ chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fid})
+ for chunk in chunks:
+ if self.db.recordDelete(ContentChunk, chunk["id"]):
+ chunkCount += 1
+ if self.db.recordDelete(FileContentIndex, fid):
+ indexCount += 1
+ if mid:
+ mandateIds.add(str(mid))
+
+ for mid in mandateIds:
+ try:
+ from modules.interfaces.interfaceDbBilling import _getRootInterface
+ _getRootInterface().reconcileMandateStorageBilling(mid)
+ except Exception as ex:
+ logger.warning("reconcileMandateStorageBilling after datasource purge failed: %s", ex)
+
+ return {"indexRows": indexCount, "chunks": chunkCount}
+
+ def listFileContentIndexByDataSource(self, dataSourceId: str) -> List[Dict[str, Any]]:
+ """List all FileContentIndex rows whose provenance.dataSourceId matches."""
+ if not dataSourceId:
+ return []
+ allRows = self.db.getRecordset(FileContentIndex)
+ out = []
+ for row in allRows:
+ prov = row.get("provenance") if isinstance(row, dict) else getattr(row, "provenance", None)
+ if isinstance(prov, dict) and prov.get("dataSourceId") == dataSourceId:
+ out.append(dict(row) if not isinstance(row, dict) else row)
+ return out
+
def deleteFileContentIndex(self, fileId: str) -> bool:
"""Delete a FileContentIndex and all associated ContentChunks."""
existing = self.getFileContentIndex(fileId)
diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py
index b7fcdeca..04251e09 100644
--- a/modules/routes/routeBilling.py
+++ b/modules/routes/routeBilling.py
@@ -1986,10 +1986,10 @@ def getUserViewTransactions(
if not pagination:
raise HTTPException(status_code=400, detail="pagination required for groupSummary")
import json as _json
- from collections import defaultdict
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
from modules.routes.routeHelpers import (
applyViewToParams,
+ build_group_summary_groups,
effective_group_by_levels,
resolveView,
)
@@ -2018,28 +2018,7 @@ def getUserViewTransactions(
summary_params,
ctx.user,
)
- counts: Dict[str, int] = defaultdict(int)
- labels: Dict[str, str] = {}
- null_key = "\x00NULL"
- for item in all_rows:
- raw = item.get(field)
- if raw is None or raw == "":
- nk = null_key
- labels[nk] = null_label
- else:
- nk = str(raw)
- if nk not in labels:
- labels[nk] = nk
- counts[nk] += 1
- groups_out: List[Dict[str, Any]] = []
- for nk in sorted(counts.keys(), key=lambda x: (x == null_key, labels.get(x, x).lower())):
- groups_out.append(
- {
- "value": None if nk == null_key else nk,
- "label": labels.get(nk, nk),
- "totalCount": counts[nk],
- }
- )
+ groups_out = build_group_summary_groups(all_rows, field, null_label, groupByLevels=levels)
return JSONResponse(content={"groups": groups_out})
paginationParams = None
diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py
index 58d36b91..04f652fb 100644
--- a/modules/routes/routeDataConnections.py
+++ b/modules/routes/routeDataConnections.py
@@ -130,7 +130,7 @@ def get_auth_authority_options(
# ============================================================================
@router.get("/")
-@limiter.limit("30/minute")
+@limiter.limit("60/minute")
async def get_connections(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
@@ -197,7 +197,9 @@ async def get_connections(
"lastChecked": connection.lastChecked,
"expiresAt": connection.expiresAt,
"tokenStatus": tokenStatus,
- "tokenExpiresAt": tokenExpiresAt
+ "tokenExpiresAt": tokenExpiresAt,
+ "knowledgeIngestionEnabled": getattr(connection, "knowledgeIngestionEnabled", False),
+ "knowledgePreferences": getattr(connection, "knowledgePreferences", None) or {},
})
return items
@@ -264,7 +266,7 @@ async def get_connections(
})
enrichRowsWithFkLabels(enhanced_connections_dict, UserConnection)
filtered = apply_strategy_b_filters_and_sort(enhanced_connections_dict, paginationParams, currentUser)
- groups_out = build_group_summary_groups(filtered, field, null_label)
+ groups_out = build_group_summary_groups(filtered, field, null_label, groupByLevels=groupByLevels)
return JSONResponse(content={"groups": groups_out})
try:
@@ -724,4 +726,172 @@ def delete_connection(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete connection: {str(e)}"
- )
\ No newline at end of file
+ )
+
+
+# =========================================================================
+# Knowledge Consent & Control Endpoints
+# =========================================================================
+
+
+def _findOwnConnection(interface, userId: str, connectionId: str):
+ """Find a connection owned by the user. Returns None if not found."""
+ connections = interface.getUserConnections(userId)
+ for conn in connections:
+ if conn.id == connectionId:
+ return conn
+ return None
+
+
+@router.patch("/{connectionId}/knowledge-consent")
+@limiter.limit("10/minute")
+def _updateKnowledgeConsent(
+ request: Request,
+ connectionId: str = Path(..., description="Connection ID"),
+ enabled: bool = Body(..., embed=True),
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Master switch: can PowerOn ingest data from this connection into the RAG knowledge store?
+
+ enabled=False: purge ALL chunks for this connection + cancel running jobs.
+ enabled=True: set flag; enqueue bootstrap only if rag-enabled DataSources exist.
+ """
+ try:
+ interface = getInterface(currentUser)
+ connection = _findOwnConnection(interface, currentUser.id, connectionId)
+ if not connection:
+ raise HTTPException(status_code=404, detail=routeApiMsg("Connection not found"))
+
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ rootIf = getRootInterface()
+ rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": enabled})
+
+ purged = None
+ cancelled = 0
+ bootstrapEnqueued = False
+
+ if not enabled:
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ purged = getKnowledgeInterface(None).deleteFileContentIndexByConnectionId(connectionId)
+
+ from modules.serviceCenter.services.serviceBackgroundJobs import cancelJobsByConnection
+ cancelled = cancelJobsByConnection(connectionId)
+ else:
+ from modules.datamodels.datamodelDataSource import DataSource
+ dataSources = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId, "ragIndexEnabled": True})
+ if dataSources:
+ import asyncio
+ from modules.serviceCenter.services.serviceBackgroundJobs import startJob
+ authority = connection.authority.value if hasattr(connection.authority, "value") else str(connection.authority or "")
+
+ async def _enqueue():
+ await startJob(
+ "connection.bootstrap",
+ {"connectionId": connectionId, "authority": authority.lower()},
+ triggeredBy=str(currentUser.id),
+ )
+ try:
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ loop.create_task(_enqueue())
+ else:
+ loop.run_until_complete(_enqueue())
+ except RuntimeError:
+ asyncio.run(_enqueue())
+ bootstrapEnqueued = True
+
+ import json as _json
+ from modules.shared.auditLogger import audit_logger
+ from modules.datamodels.datamodelAudit import AuditCategory
+ audit_logger.logEvent(
+ userId=str(currentUser.id),
+ mandateId=str(getattr(connection, "mandateId", "") or ""),
+ category=AuditCategory.PERMISSION.value,
+ action="knowledge_consent_changed",
+ details=_json.dumps({"connectionId": connectionId, "enabled": enabled}),
+ )
+
+ logger.info("Knowledge consent %s for connection %s by user %s",
+ "enabled" if enabled else "disabled", connectionId, currentUser.id)
+ return {
+ "connectionId": connectionId,
+ "knowledgeIngestionEnabled": enabled,
+ "purged": purged,
+ "cancelledJobs": cancelled,
+ "bootstrapEnqueued": bootstrapEnqueued,
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error updating knowledge consent: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.patch("/{connectionId}/knowledge-preferences")
+@limiter.limit("20/minute")
+def _updateKnowledgePreferences(
+ request: Request,
+ connectionId: str = Path(..., description="Connection ID"),
+ preferences: Dict[str, Any] = Body(..., embed=True),
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Update per-connection knowledge ingestion preferences (mail depth, attachments, etc.)."""
+ _ALLOWED_KEYS = {"mailContentDepth", "mailIndexAttachments", "filesIndexBinaries",
+ "clickupScope", "clickupIndexAttachments", "maxAgeDays"}
+ try:
+ interface = getInterface(currentUser)
+ connection = _findOwnConnection(interface, currentUser.id, connectionId)
+ if not connection:
+ raise HTTPException(status_code=404, detail=routeApiMsg("Connection not found"))
+
+ existing = getattr(connection, "knowledgePreferences", None) or {}
+ cleaned = {k: v for k, v in preferences.items() if k in _ALLOWED_KEYS}
+ merged = {**existing, **cleaned, "schemaVersion": 1}
+
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ getRootInterface().db.recordModify(UserConnection, connectionId, {"knowledgePreferences": merged})
+
+ logger.info("Knowledge preferences updated for connection %s", connectionId)
+ return {"connectionId": connectionId, "knowledgePreferences": merged, "updated": True}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error updating knowledge preferences: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/{connectionId}/knowledge-stop")
+@limiter.limit("10/minute")
+def _stopKnowledgeJobs(
+ request: Request,
+ connectionId: str = Path(..., description="Connection ID"),
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Cancel all running/pending bootstrap jobs for this connection."""
+ try:
+ interface = getInterface(currentUser)
+ connection = _findOwnConnection(interface, currentUser.id, connectionId)
+ if not connection:
+ raise HTTPException(status_code=404, detail=routeApiMsg("Connection not found"))
+
+ from modules.serviceCenter.services.serviceBackgroundJobs import cancelJobsByConnection
+ cancelled = cancelJobsByConnection(connectionId)
+
+ import json as _json
+ from modules.shared.auditLogger import audit_logger
+ from modules.datamodels.datamodelAudit import AuditCategory
+ audit_logger.logEvent(
+ userId=str(currentUser.id),
+ mandateId=str(getattr(connection, "mandateId", "") or ""),
+ category=AuditCategory.PERMISSION.value,
+ action="knowledge_jobs_stopped",
+ details=_json.dumps({"connectionId": connectionId, "cancelledCount": cancelled}),
+ )
+
+ logger.info("Stopped %d knowledge jobs for connection %s", cancelled, connectionId)
+ return {"connectionId": connectionId, "cancelled": cancelled}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error stopping knowledge jobs: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
\ No newline at end of file
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index 244b77b0..3a951f3e 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -543,7 +543,7 @@ def get_files(
FileItem,
)
filtered = apply_strategy_b_filters_and_sort(allItems, paginationParams, currentUser)
- groups_out = build_group_summary_groups(filtered, field, null_label)
+ groups_out = build_group_summary_groups(filtered, field, null_label, groupByLevels=groupByLevels)
return JSONResponse(content={"groups": groups_out})
if mode == "filterValues":
diff --git a/modules/routes/routeDataPrompts.py b/modules/routes/routeDataPrompts.py
index c410d26a..331267b5 100644
--- a/modules/routes/routeDataPrompts.py
+++ b/modules/routes/routeDataPrompts.py
@@ -100,7 +100,7 @@ def get_prompts(
result if isinstance(result, list) else (result.items if hasattr(result, "items") else [])
)
filtered = apply_strategy_b_filters_and_sort(allItems, paginationParams, currentUser)
- groups_out = build_group_summary_groups(filtered, field, null_label)
+ groups_out = build_group_summary_groups(filtered, field, null_label, groupByLevels=groupByLevels)
return JSONResponse(content={"groups": groups_out})
if mode == "filterValues":
diff --git a/modules/routes/routeDataSources.py b/modules/routes/routeDataSources.py
index 5df8a18b..f7e5425d 100644
--- a/modules/routes/routeDataSources.py
+++ b/modules/routes/routeDataSources.py
@@ -1,6 +1,6 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
-"""PATCH endpoints for DataSource and FeatureDataSource scope/neutralize tagging."""
+"""PATCH endpoints for DataSource and FeatureDataSource scope/neutralize/rag-index tagging."""
import logging
from typing import Any, Dict, List, Optional
@@ -125,3 +125,75 @@ def _updateNeutralizeFields(
except Exception as e:
logger.error("Error updating neutralizeFields: %s", e)
raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.patch("/{sourceId}/rag-index")
+@limiter.limit("30/minute")
+def _updateDataSourceRagIndex(
+ request: Request,
+ sourceId: str = Path(..., description="ID of the DataSource"),
+ ragIndexEnabled: bool = Body(..., embed=True),
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Toggle RAG indexing for a DataSource.
+
+ true: sets flag + enqueues mini-bootstrap for this DataSource only.
+ false: sets flag + synchronously purges all chunks from this DataSource.
+ """
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ rootIf = getRootInterface()
+ rec = rootIf.db.getRecord(DataSource, sourceId)
+ if not rec:
+ raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
+
+ rootIf.db.recordModify(DataSource, sourceId, {"ragIndexEnabled": ragIndexEnabled})
+ logger.info("Updated ragIndexEnabled=%s for DataSource %s", ragIndexEnabled, sourceId)
+
+ if ragIndexEnabled:
+ from modules.serviceCenter.services.serviceBackgroundJobs import startJob
+ import asyncio
+
+ connectionId = rec.get("connectionId") or rec.get("connection_id") or ""
+ conn = rootIf.getUserConnectionById(connectionId) if connectionId else None
+ authority = ""
+ if conn:
+ authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority or "")
+
+ async def _enqueue():
+ await startJob(
+ "connection.bootstrap",
+ {"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": [sourceId]},
+ triggeredBy=str(context.user.id),
+ )
+ try:
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ loop.create_task(_enqueue())
+ else:
+ loop.run_until_complete(_enqueue())
+ except RuntimeError:
+ asyncio.run(_enqueue())
+ else:
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ purgeResult = getKnowledgeInterface(None).deleteFileContentIndexByDataSource(sourceId)
+ logger.info("Purged %d index rows / %d chunks for DataSource %s",
+ purgeResult.get("indexRows", 0), purgeResult.get("chunks", 0), sourceId)
+
+ import json
+ from modules.shared.auditLogger import audit_logger
+ from modules.datamodels.datamodelAudit import AuditCategory
+ audit_logger.logEvent(
+ userId=str(context.user.id),
+ mandateId=context.mandateId,
+ category=AuditCategory.PERMISSION.value,
+ action="rag_index_toggled",
+ details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled}),
+ )
+
+ return {"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "updated": True}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error updating datasource ragIndexEnabled: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
diff --git a/modules/routes/routeHelpers.py b/modules/routes/routeHelpers.py
index f5af7d06..f1d88e31 100644
--- a/modules/routes/routeHelpers.py
+++ b/modules/routes/routeHelpers.py
@@ -825,45 +825,106 @@ def build_group_summary_groups(
items: List[Dict[str, Any]],
field: str,
null_label: str = "—",
+ groupByLevels: List[Dict[str, Any]] | None = None,
) -> List[Dict[str, Any]]:
"""
- Build {"value", "label", "totalCount"} for mode=groupSummary (single grouping level).
+ Build {"value", "label", "totalCount"} summaries for mode=groupSummary.
+
+ When *groupByLevels* contains more than one level the function produces one
+ entry per unique combination of all level values (flat permutations).
+ ``value`` becomes a ``///``-joined composite key and ``label`` the ``/``-joined
+ human-readable label so the frontend can split them back.
"""
from collections import defaultdict
- counts: Dict[str, int] = defaultdict(int)
- display_by_key: Dict[str, str] = {}
- null_key = "\x00NULL"
- label_attr = f"{field}Label"
+ fields: list[dict] = []
+ if groupByLevels and len(groupByLevels) > 1:
+ for lvl in groupByLevels:
+ f = lvl.get("field", "")
+ nl = str(lvl.get("nullLabel") or null_label)
+ if f:
+ fields.append({"field": f, "nullLabel": nl})
+ if not fields:
+ fields = [{"field": field, "nullLabel": null_label}]
+ nullKey = "\x00NULL"
+
+ if len(fields) == 1:
+ f = fields[0]["field"]
+ nl = fields[0]["nullLabel"]
+ counts: Dict[str, int] = defaultdict(int)
+ displayByKey: Dict[str, str] = {}
+ labelAttr = f"{f}Label"
+ for item in items:
+ raw = item.get(f)
+ if raw is None or raw == "":
+ nk = nullKey
+ display = nl
+ else:
+ nk = str(raw)
+ display = None
+ lbl = item.get(labelAttr)
+ if lbl is not None and lbl != "":
+ display = str(lbl)
+ if display is None:
+ display = nk
+ counts[nk] += 1
+ if nk not in displayByKey:
+ displayByKey[nk] = display
+ orderedKeys = sorted(
+ counts.keys(),
+ key=lambda x: (x == nullKey, str(displayByKey.get(x, x)).lower()),
+ )
+ return [
+ {
+ "value": None if nk == nullKey else nk,
+ "label": displayByKey.get(nk, nk),
+ "totalCount": counts[nk],
+ }
+ for nk in orderedKeys
+ ]
+
+ counts = defaultdict(int)
+ displayByComposite: Dict[str, list] = {}
+ filtersByComposite: Dict[str, dict] = {}
for item in items:
- raw = item.get(field)
- if raw is None or raw == "":
- nk = null_key
- display = null_label
- else:
- nk = str(raw)
- display = None
- lbl = item.get(label_attr)
- if lbl is not None and lbl != "":
- display = str(lbl)
- if display is None:
- display = nk
- counts[nk] += 1
- if nk not in display_by_key:
- display_by_key[nk] = display
+ parts: list[str] = []
+ labels: list[str] = []
+ filterMap: dict = {}
+ for fd in fields:
+ f = fd["field"]
+ nl = fd["nullLabel"]
+ labelAttr = f"{f}Label"
+ raw = item.get(f)
+ if raw is None or raw == "":
+ parts.append(nullKey)
+ labels.append(nl)
+ filterMap[f] = None
+ else:
+ parts.append(str(raw))
+ lbl = item.get(labelAttr)
+ labels.append(str(lbl) if lbl not in (None, "") else str(raw))
+ filterMap[f] = str(raw)
+ compositeKey = "///".join(parts)
+ counts[compositeKey] += 1
+ if compositeKey not in displayByComposite:
+ displayByComposite[compositeKey] = labels
+ filtersByComposite[compositeKey] = filterMap
- ordered_keys = sorted(
+ orderedKeys = sorted(
counts.keys(),
- key=lambda x: (x == null_key, str(display_by_key.get(x, x)).lower()),
+ key=lambda x: tuple(
+ (seg == nullKey, seg.lower()) for seg in x.split("///")
+ ),
)
return [
{
- "value": None if nk == null_key else nk,
- "label": display_by_key.get(nk, nk),
- "totalCount": counts[nk],
+ "value": ck.replace(nullKey, "__null__") if nullKey in ck else ck,
+ "label": " / ".join(displayByComposite[ck]),
+ "totalCount": counts[ck],
+ "filters": filtersByComposite[ck],
}
- for nk in ordered_keys
+ for ck in orderedKeys
]
diff --git a/modules/routes/routeRagInventory.py b/modules/routes/routeRagInventory.py
new file mode 100644
index 00000000..08d2a245
--- /dev/null
+++ b/modules/routes/routeRagInventory.py
@@ -0,0 +1,267 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""RAG Inventory API — global knowledge-store visibility for users, admins, platform."""
+
+import logging
+from typing import Any, Dict, List, Optional
+
+from fastapi import APIRouter, HTTPException, Depends, Request
+from modules.auth import limiter, getCurrentUser, getRequestContext, RequestContext
+from modules.datamodels.datamodelUam import User
+from modules.shared.i18nRegistry import apiRouteContext
+
+routeApiMsg = apiRouteContext("routeRagInventory")
+logger = logging.getLogger(__name__)
+
+router = APIRouter(
+ prefix="/api/rag/inventory",
+ tags=["RAG Inventory"],
+ responses={
+ 401: {"description": "Unauthorized"},
+ 403: {"description": "Forbidden"},
+ 500: {"description": "Internal server error"},
+ },
+)
+
+
+def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> List[Dict[str, Any]]:
+ from modules.datamodels.datamodelDataSource import DataSource
+ from modules.datamodels.datamodelKnowledge import FileContentIndex
+
+ out = []
+ for conn in connections:
+ connectionId = str(conn.id)
+ dataSources = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
+
+ connIndexRows = knowledgeIf.db.getRecordset(FileContentIndex, recordFilter={"connectionId": connectionId})
+ connChunkTotal = len(connIndexRows)
+
+ dsItems = []
+ for ds in dataSources:
+ dsId = ds.get("id") if isinstance(ds, dict) else getattr(ds, "id", "")
+ dsItems.append({
+ "id": dsId,
+ "label": ds.get("label") if isinstance(ds, dict) else getattr(ds, "label", ""),
+ "path": ds.get("path") if isinstance(ds, dict) else getattr(ds, "path", ""),
+ "sourceType": ds.get("sourceType") if isinstance(ds, dict) else getattr(ds, "sourceType", ""),
+ "ragIndexEnabled": ds.get("ragIndexEnabled") if isinstance(ds, dict) else getattr(ds, "ragIndexEnabled", False),
+ "neutralize": ds.get("neutralize") if isinstance(ds, dict) else getattr(ds, "neutralize", False),
+ "lastIndexed": ds.get("lastIndexed") if isinstance(ds, dict) else getattr(ds, "lastIndexed", None),
+ "chunkCount": 0,
+ })
+
+ if dsItems and connChunkTotal > 0 and len(dsItems) == 1:
+ dsItems[0]["chunkCount"] = connChunkTotal
+
+ jobs = jobService.listJobs(jobType="connection.bootstrap", limit=5)
+ connJobs = [j for j in jobs if (j.get("payload") or {}).get("connectionId") == connectionId]
+ runningJobs = [
+ {"jobId": j["id"], "progress": j.get("progress", 0), "progressMessage": j.get("progressMessage", "")}
+ for j in connJobs
+ if j.get("status") in ("PENDING", "RUNNING")
+ ]
+ lastError = None
+ for j in connJobs:
+ if j.get("status") == "ERROR":
+ lastError = {"jobId": j["id"], "errorMessage": j.get("errorMessage", "")}
+ break
+
+ out.append({
+ "id": connectionId,
+ "authority": conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority),
+ "externalEmail": getattr(conn, "externalEmail", ""),
+ "knowledgeIngestionEnabled": getattr(conn, "knowledgeIngestionEnabled", False),
+ "preferences": getattr(conn, "knowledgePreferences", None) or {},
+ "dataSources": dsItems,
+ "totalChunks": connChunkTotal,
+ "runningJobs": runningJobs,
+ "lastError": lastError,
+ })
+ return out
+
+
+@router.get("/me")
+@limiter.limit("30/minute")
+def _getInventoryMe(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Personal RAG inventory: own connections + DataSources + chunk counts."""
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ from modules.serviceCenter.services.serviceBackgroundJobs import mainBackgroundJobService as jobService
+
+ rootIf = getRootInterface()
+ knowledgeIf = getKnowledgeInterface(None)
+ connections = rootIf.getUserConnections(currentUser.id)
+
+ items = _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService)
+ totalChunks = sum(c.get("totalChunks", 0) for c in items)
+
+ return {"connections": items, "totals": {"chunks": totalChunks}}
+ except Exception as e:
+ logger.error("Error in RAG inventory /me: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/mandate")
+@limiter.limit("20/minute")
+def _getInventoryMandate(
+ request: Request,
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Mandate-level RAG aggregation (requires mandate membership)."""
+ if not context.mandateId:
+ raise HTTPException(status_code=403, detail=routeApiMsg("Mandate context required"))
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface, aggregateMandateRagTotalBytes
+ from modules.serviceCenter.services.serviceBackgroundJobs import mainBackgroundJobService as jobService
+
+ rootIf = getRootInterface()
+ knowledgeIf = getKnowledgeInterface(None)
+ mandateId = str(context.mandateId) if context.mandateId else ""
+
+ from modules.datamodels.datamodelUam import UserConnection
+ allConnections = rootIf.db.getRecordset(UserConnection, recordFilter={"mandateId": mandateId})
+ connectionObjects = [type("C", (), row)() if isinstance(row, dict) else row for row in allConnections]
+
+ items = _buildConnectionInventory(connectionObjects, rootIf, knowledgeIf, jobService)
+ totalChunks = sum(c.get("totalChunks", 0) for c in items)
+ totalBytes = aggregateMandateRagTotalBytes(mandateId)
+
+ return {"connections": items, "totals": {"chunks": totalChunks, "bytes": totalBytes}}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error in RAG inventory /mandate: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/platform")
+@limiter.limit("10/minute")
+def _getInventoryPlatform(
+ request: Request,
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Platform-wide RAG statistics (sysadmin only)."""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail=routeApiMsg("Platform admin required"))
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ from modules.serviceCenter.services.serviceBackgroundJobs import mainBackgroundJobService as jobService
+ from modules.datamodels.datamodelUam import UserConnection
+
+ rootIf = getRootInterface()
+ knowledgeIf = getKnowledgeInterface(None)
+ allConnections = rootIf.db.getRecordset(UserConnection)
+ connectionObjects = [type("C", (), row)() if isinstance(row, dict) else row for row in allConnections]
+
+ items = _buildConnectionInventory(connectionObjects, rootIf, knowledgeIf, jobService)
+ totalChunks = sum(c.get("totalChunks", 0) for c in items)
+
+ return {"connections": items, "totals": {"chunks": totalChunks}}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error in RAG inventory /platform: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/reindex/{connectionId}")
+@limiter.limit("10/minute")
+def _reindexConnection(
+ request: Request,
+ connectionId: str,
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Re-trigger bootstrap for a connection (re-index all ragIndexEnabled DataSources).
+
+ Submits a new connection.bootstrap job, regardless of previous failures.
+ """
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.serviceCenter.services.serviceBackgroundJobs import startJob
+ from modules.datamodels.datamodelDataSource import DataSource
+ import asyncio
+
+ rootIf = getRootInterface()
+ conn = rootIf.getUserConnectionById(connectionId)
+ if conn is None:
+ raise HTTPException(status_code=404, detail="Connection not found")
+
+ if str(conn.userId) != str(currentUser.id):
+ raise HTTPException(status_code=403, detail="Not your connection")
+
+ dataSources = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
+ ragDs = [ds for ds in dataSources if (ds.get("ragIndexEnabled") if isinstance(ds, dict) else getattr(ds, "ragIndexEnabled", False))]
+ if not ragDs:
+ return {"status": "skipped", "reason": "no_rag_enabled_datasources"}
+
+ authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority or "")
+ dsIds = [(ds.get("id") if isinstance(ds, dict) else getattr(ds, "id", "")) for ds in ragDs]
+
+ async def _enqueue():
+ return await startJob(
+ "connection.bootstrap",
+ {"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": dsIds},
+ triggeredBy=str(currentUser.id),
+ )
+ try:
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ future = asyncio.ensure_future(_enqueue())
+ jobId = None
+ else:
+ jobId = loop.run_until_complete(_enqueue())
+ except RuntimeError:
+ jobId = asyncio.run(_enqueue())
+
+ logger.info("Reindex triggered for connection %s (%d DataSources)", connectionId, len(dsIds))
+ return {"status": "queued", "connectionId": connectionId, "dataSourceCount": len(dsIds), "jobId": jobId}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error triggering reindex: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/jobs")
+@limiter.limit("60/minute")
+def _getActiveJobs(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser),
+) -> List[Dict[str, Any]]:
+ """Active RAG jobs for the current user (used by header badge)."""
+ try:
+ from modules.serviceCenter.services.serviceBackgroundJobs import listJobs
+ from modules.interfaces.interfaceDbApp import getRootInterface
+
+ rootIf = getRootInterface()
+ connections = rootIf.getUserConnections(currentUser.id)
+ connectionMap = {str(c.id): c for c in connections}
+ connectionIds = set(connectionMap.keys())
+
+ jobs = listJobs(jobType="connection.bootstrap", limit=50)
+ active = []
+ for j in jobs:
+ if j.get("status") not in ("PENDING", "RUNNING"):
+ continue
+ payload = j.get("payload") or {}
+ connId = payload.get("connectionId")
+ if connId in connectionIds:
+ conn = connectionMap[connId]
+ active.append({
+ "jobId": j["id"],
+ "connectionId": connId,
+ "connectionLabel": getattr(conn, "displayLabel", None) or getattr(conn, "authority", connId),
+ "jobType": j.get("jobType", "connection.bootstrap"),
+ "progress": j.get("progress", 0),
+ "progressMessage": j.get("progressMessage", ""),
+ })
+ return active
+ except Exception as e:
+ logger.error("Error in RAG inventory /jobs: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
index e9d4c94c..ce67dc4a 100644
--- a/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
+++ b/modules/serviceCenter/services/serviceBackgroundJobs/__init__.py
@@ -7,6 +7,9 @@ from .mainBackgroundJobService import (
startJob,
getJobStatus,
listJobs,
+ cancelJob,
+ cancelJobsByConnection,
+ isTerminalStatus,
JobProgressCallback,
)
@@ -15,5 +18,8 @@ __all__ = [
"startJob",
"getJobStatus",
"listJobs",
+ "cancelJob",
+ "cancelJobsByConnection",
+ "isTerminalStatus",
"JobProgressCallback",
]
diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
index b8a55e28..66ca4708 100644
--- a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
+++ b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
@@ -30,6 +30,7 @@ clear message. No silent zombies.
import asyncio
import logging
+import time
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable, Dict, List, Optional
@@ -49,7 +50,46 @@ JOBS_DATABASE = APP_CONFIG.get("DB_DATABASE", "poweron_app")
registerDatabase(JOBS_DATABASE)
-JobProgressCallback = Callable[[int, Optional[str]], None]
+_CANCEL_CHECK_INTERVAL_S = 3.0
+
+
+class JobProgressCallback:
+ """Callable progress reporter with cooperative cancel-check for long-running walkers."""
+
+ def __init__(self, jobId: str):
+ self._jobId = jobId
+ self._cancelledCache: Optional[bool] = None
+ self._lastCheckedAt: float = 0.0
+
+ def __call__(self, progress: int, message: Optional[str] = None) -> None:
+ try:
+ clamped = max(0, min(100, int(progress)))
+ fields: Dict[str, Any] = {"progress": clamped}
+ if message is not None:
+ fields["progressMessage"] = message[:500]
+ _updateJob(self._jobId, fields)
+ except Exception as ex:
+ logger.warning("Progress update failed for job %s: %s", self._jobId, ex)
+
+ def isCancelled(self) -> bool:
+ """Check if this job was cancelled. Reads DB at most every 3s to limit load."""
+ now = time.time()
+ if self._cancelledCache is True:
+ return True
+ if now - self._lastCheckedAt < _CANCEL_CHECK_INTERVAL_S:
+ return self._cancelledCache or False
+ self._lastCheckedAt = now
+ try:
+ job = _loadJob(self._jobId)
+ if job and job.get("status") == BackgroundJobStatusEnum.CANCELLED.value:
+ self._cancelledCache = True
+ return True
+ except Exception:
+ pass
+ self._cancelledCache = False
+ return False
+
+
JobHandler = Callable[[Dict[str, Any], JobProgressCallback], Awaitable[Optional[Dict[str, Any]]]]
@@ -155,16 +195,7 @@ def _markError(jobId: str, errorMessage: str) -> None:
def _makeProgressCallback(jobId: str) -> JobProgressCallback:
- def _cb(progress: int, message: Optional[str] = None) -> None:
- try:
- clamped = max(0, min(100, int(progress)))
- fields: Dict[str, Any] = {"progress": clamped}
- if message is not None:
- fields["progressMessage"] = message[:500]
- _updateJob(jobId, fields)
- except Exception as ex:
- logger.warning("Progress update failed for job %s: %s", jobId, ex)
- return _cb
+ return JobProgressCallback(jobId)
async def _runJob(jobId: str) -> None:
@@ -220,12 +251,51 @@ def isTerminalStatus(status: str) -> bool:
return status in {s.value for s in TERMINAL_JOB_STATUSES}
+def cancelJob(jobId: str, *, reason: str = "user_requested") -> bool:
+ """Mark a job as CANCELLED. Walkers detect this via JobProgressCallback.isCancelled().
+
+ Returns False if the job is already in a terminal state or does not exist.
+ """
+ job = _loadJob(jobId)
+ if not job:
+ return False
+ if isTerminalStatus(job.get("status", "")):
+ return False
+ _updateJob(jobId, {
+ "status": BackgroundJobStatusEnum.CANCELLED.value,
+ "errorMessage": f"cancelled: {reason}"[:1000],
+ "finishedAt": datetime.now(timezone.utc).timestamp(),
+ })
+ logger.info("BackgroundJob %s cancelled (reason=%s)", jobId, reason)
+ return True
+
+
+def cancelJobsByConnection(connectionId: str, *, jobType: str = "connection.bootstrap") -> int:
+ """Cancel all RUNNING/PENDING jobs whose payload.connectionId matches.
+
+ Returns count of jobs marked as cancelled.
+ """
+ db = _getDb()
+ rows = db.getRecordset(BackgroundJob, recordFilter={"jobType": jobType})
+ count = 0
+ for row in rows:
+ status = row.get("status", "")
+ if status not in (BackgroundJobStatusEnum.PENDING.value, BackgroundJobStatusEnum.RUNNING.value):
+ continue
+ payload = row.get("payload") or {}
+ if payload.get("connectionId") == connectionId:
+ if cancelJob(row["id"], reason=f"connection_stop:{connectionId[:8]}"):
+ count += 1
+ return count
+
+
def recoverInterruptedJobs() -> int:
- """Flip any RUNNING jobs to ERROR (called at worker boot).
+ """Flip any RUNNING jobs to ERROR and re-queue bootstrap jobs (called at worker boot).
A RUNNING job in the DB after process restart means the previous worker
died mid-execution; the asyncio task is gone and the job will never
- finish on its own.
+ finish on its own. For connection.bootstrap jobs, a fresh job is
+ automatically re-queued so the user doesn't have to manually retry.
"""
db = _getDb()
try:
@@ -234,12 +304,34 @@ def recoverInterruptedJobs() -> int:
logger.warning("recoverInterruptedJobs: failed to scan RUNNING jobs: %s", ex)
return 0
count = 0
+ requeued = 0
for row in rows:
try:
_markError(row["id"], "Interrupted by worker restart")
count += 1
except Exception as ex:
logger.warning("recoverInterruptedJobs: could not mark %s as ERROR: %s", row.get("id"), ex)
+ continue
+
+ if row.get("jobType") == "connection.bootstrap":
+ payload = row.get("payload") or {}
+ if payload.get("connectionId"):
+ try:
+ newJob = BackgroundJob(
+ jobType="connection.bootstrap",
+ payload=payload,
+ triggeredBy="recovery.requeue",
+ )
+ record = db.recordCreate(BackgroundJob, _serialiseDatetimes(newJob.model_dump()))
+ asyncio.create_task(_runJob(record["id"]))
+ requeued += 1
+ logger.info(
+ "recoverInterruptedJobs: re-queued bootstrap for connectionId=%s (new jobId=%s)",
+ payload["connectionId"], record["id"],
+ )
+ except Exception as reqEx:
+ logger.warning("recoverInterruptedJobs: re-queue failed for %s: %s", row.get("id"), reqEx)
+
if count:
- logger.warning("Recovered %d interrupted background job(s) after restart", count)
+ logger.warning("Recovered %d interrupted background job(s) after restart (re-queued %d)", count, requeued)
return count
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
index 97ac61d5..0e2d251f 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
@@ -122,21 +122,54 @@ def _onConnectionRevoked(
)
+_SOURCE_TYPE_MAP = {
+ "msft": {
+ "sharepoint": ("sharepointFolder", "onedriveFolder"),
+ "outlook": ("outlookFolder", "calendarFolder", "contactFolder"),
+ },
+ "google": {
+ "drive": ("googleDriveFolder",),
+ "gmail": ("gmailFolder",),
+ },
+ "clickup": {
+ "clickup": ("clickupList",),
+ },
+ "infomaniak": {
+ "kdrive": ("kdriveFolder",),
+ },
+}
+
+
+def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] = None):
+ """Load DataSource rows with ragIndexEnabled=true for a connection.
+
+ If dataSourceIds is provided (mini-bootstrap), filter to only those IDs.
+ """
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.datamodels.datamodelDataSource import DataSource
+
+ rootIf = getRootInterface()
+ allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
+ if dataSourceIds:
+ return [ds for ds in allDs if ds.get("id") in dataSourceIds and ds.get("ragIndexEnabled")]
+ return [ds for ds in allDs if ds.get("ragIndexEnabled")]
+
+
async def _bootstrapJobHandler(
job: Dict[str, Any],
progressCb,
) -> Dict[str, Any]:
- """Dispatch bootstrap by authority. Each authority runs its own sub-bootstraps."""
+ """Dispatch bootstrap by authority, iterating only over ragIndexEnabled DataSources."""
payload = job.get("payload") or {}
connectionId = payload.get("connectionId")
authority = (payload.get("authority") or "").lower()
+ dataSourceIds = payload.get("dataSourceIds")
if not connectionId:
raise ValueError("connection.bootstrap requires payload.connectionId")
progressCb(5, f"resolving {authority} connection")
- # Defensive consent check: if the connection has since disabled knowledge ingestion
- # (e.g. user toggled setting after the job was enqueued), skip all walkers.
+ # Defensive consent check
try:
from modules.interfaces.interfaceDbApp import getRootInterface
_root = getRootInterface()
@@ -156,6 +189,21 @@ async def _bootstrapJobHandler(
except Exception as _guardErr:
logger.debug("Could not load connection for consent guard: %s", _guardErr)
+ # Load only ragIndexEnabled DataSources for this connection
+ dataSources = _loadRagEnabledDataSources(connectionId, dataSourceIds)
+ if not dataSources:
+ logger.info(
+ "ingestion.connection.bootstrap.skipped — no rag-enabled DataSources connectionId=%s",
+ connectionId,
+ extra={
+ "event": "ingestion.connection.bootstrap.skipped",
+ "connectionId": connectionId,
+ "authority": authority,
+ "reason": "no_data_sources",
+ },
+ )
+ return {"connectionId": connectionId, "authority": authority, "skipped": True, "reason": "no_data_sources"}
+
def _normalize(res: Any, label: str) -> Dict[str, Any]:
if isinstance(res, Exception):
logger.error(
@@ -165,6 +213,10 @@ async def _bootstrapJobHandler(
return {"error": str(res)}
return res or {}
+ def _filterDs(walkerKey: str) -> list:
+ sourceTypes = _SOURCE_TYPE_MAP.get(authority, {}).get(walkerKey, ())
+ return [ds for ds in dataSources if ds.get("sourceType") in sourceTypes]
+
if authority == "msft":
from modules.serviceCenter.services.serviceKnowledge.subConnectorSyncSharepoint import (
bootstrapSharepoint,
@@ -174,9 +226,14 @@ async def _bootstrapJobHandler(
)
progressCb(10, "sharepoint + outlook")
+ spDs = _filterDs("sharepoint")
+ olDs = _filterDs("outlook")
+ async def _noopResult():
+ return {"skipped": True, "reason": "no_datasources"}
+
spResult, olResult = await asyncio.gather(
- bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb),
- bootstrapOutlook(connectionId=connectionId, progressCb=progressCb),
+ bootstrapSharepoint(connectionId=connectionId, progressCb=progressCb, dataSources=spDs) if spDs else _noopResult(),
+ bootstrapOutlook(connectionId=connectionId, progressCb=progressCb, dataSources=olDs) if olDs else _noopResult(),
return_exceptions=True,
)
return {
@@ -195,9 +252,14 @@ async def _bootstrapJobHandler(
)
progressCb(10, "drive + gmail")
+ gdDs = _filterDs("drive")
+ gmDs = _filterDs("gmail")
+ async def _noopResult():
+ return {"skipped": True, "reason": "no_datasources"}
+
gdResult, gmResult = await asyncio.gather(
- bootstrapGdrive(connectionId=connectionId, progressCb=progressCb),
- bootstrapGmail(connectionId=connectionId, progressCb=progressCb),
+ bootstrapGdrive(connectionId=connectionId, progressCb=progressCb, dataSources=gdDs) if gdDs else _noopResult(),
+ bootstrapGmail(connectionId=connectionId, progressCb=progressCb, dataSources=gmDs) if gmDs else _noopResult(),
return_exceptions=True,
)
return {
@@ -213,7 +275,8 @@ async def _bootstrapJobHandler(
)
progressCb(10, "clickup tasks")
- cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb)
+ cuDs = _filterDs("clickup")
+ cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb, dataSources=cuDs) if cuDs else {"skipped": True, "reason": "no_datasources"}
return {
"connectionId": connectionId,
"authority": authority,
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py
index 950400ce..4aaaa9bf 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorPrefs.py
@@ -9,7 +9,7 @@ is None).
from __future__ import annotations
import logging
-from dataclasses import dataclass, field
+from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@@ -21,10 +21,11 @@ _DEFAULT_CLICKUP_SCOPE = "title_description"
@dataclass
class ConnectionIngestionPrefs:
- """Parsed per-connection preferences for knowledge ingestion walkers."""
+ """Parsed per-connection preferences for knowledge ingestion walkers.
- # PII
- neutralizeBeforeEmbed: bool = False
+ Neutralization is now controlled per DataSource.neutralize (not here).
+ Surface toggles are obsolete — walker iterates only over ragIndexEnabled DataSources.
+ """
# Mail (Outlook + Gmail)
mailContentDepth: str = _DEFAULT_MAIL_DEPTH # "metadata" | "snippet" | "full"
@@ -32,18 +33,11 @@ class ConnectionIngestionPrefs:
# Files (Drive / SharePoint / OneDrive)
filesIndexBinaries: bool = True
- mimeAllowlist: List[str] = field(default_factory=list) # empty = all allowed
# ClickUp
clickupScope: str = _DEFAULT_CLICKUP_SCOPE # "titles" | "title_description" | "with_comments"
clickupIndexAttachments: bool = False
- # Per-authority surface toggles (default everything on)
- gmailEnabled: bool = True
- driveEnabled: bool = True
- sharepointEnabled: bool = True
- outlookEnabled: bool = True
-
# Time window
maxAgeDays: int = _DEFAULT_MAX_AGE_DAYS # 0 = no limit
@@ -78,22 +72,12 @@ def loadConnectionPrefs(connectionId: str) -> ConnectionIngestionPrefs:
v = raw.get(key)
return int(v) if isinstance(v, int) else default
- surface = raw.get("surfaceToggles") or {}
- google_surf = surface.get("google") or {}
- msft_surf = surface.get("msft") or {}
-
return ConnectionIngestionPrefs(
- neutralizeBeforeEmbed=_bool("neutralizeBeforeEmbed", False),
mailContentDepth=_str("mailContentDepth", ["metadata", "snippet", "full"], _DEFAULT_MAIL_DEPTH),
mailIndexAttachments=_bool("mailIndexAttachments", False),
filesIndexBinaries=_bool("filesIndexBinaries", True),
- mimeAllowlist=list(raw.get("mimeAllowlist") or []),
clickupScope=_str("clickupScope", ["titles", "title_description", "with_comments"], _DEFAULT_CLICKUP_SCOPE),
clickupIndexAttachments=_bool("clickupIndexAttachments", False),
- gmailEnabled=bool(google_surf.get("gmail", True)),
- driveEnabled=bool(google_surf.get("drive", True)),
- sharepointEnabled=bool(msft_surf.get("sharepoint", True)),
- outlookEnabled=bool(msft_surf.get("outlook", True)),
maxAgeDays=_int("maxAgeDays", _DEFAULT_MAX_AGE_DAYS),
)
except Exception as exc:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
index 31ac9687..7acbaa19 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
@@ -23,7 +23,7 @@ import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
-from typing import Any, Callable, Dict, List, Optional
+from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@@ -150,8 +150,6 @@ def _buildContentObjects(task: Dict[str, Any], limits: ClickupBootstrapLimits) -
"data": description,
"contextRef": {"part": "description"},
})
- # text_content is ClickUp's rendered-markdown version; include if it adds
- # something beyond the plain description (common for bullet lists, checklists).
textContent = _truncate(task.get("text_content"), limits.maxDescriptionChars)
if textContent and textContent != description:
parts.append({
@@ -166,33 +164,35 @@ def _buildContentObjects(task: Dict[str, Any], limits: ClickupBootstrapLimits) -
async def bootstrapClickup(
connectionId: str,
*,
- progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ dataSources: Optional[List[Dict[str, Any]]] = None,
+ progressCb: Optional[Any] = None,
adapter: Any = None,
connection: Any = None,
knowledgeService: Any = None,
limits: Optional[ClickupBootstrapLimits] = None,
) -> Dict[str, Any]:
- """Walk workspaces → lists → tasks and ingest each task as a virtual doc."""
- from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs
- prefs = loadConnectionPrefs(connectionId)
+ """Walk workspaces → lists → tasks and ingest each task as a virtual doc.
+
+ Iterates only over explicitly provided dataSources (ragIndexEnabled=true).
+ Each DataSource defines the neutralize policy for its subtree.
+ """
+ if not dataSources:
+ return {"connectionId": connectionId, "skipped": True, "reason": "no_datasources"}
if not limits:
- limits = ClickupBootstrapLimits(
- maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None,
- neutralize=prefs.neutralizeBeforeEmbed,
- clickupScope=prefs.clickupScope,
- )
+ limits = ClickupBootstrapLimits()
startMs = time.time()
result = ClickupBootstrapResult(connectionId=connectionId)
logger.info(
- "ingestion.connection.bootstrap.started part=clickup connectionId=%s",
- connectionId,
+ "ingestion.connection.bootstrap.started part=clickup connectionId=%s dataSources=%d",
+ connectionId, len(dataSources),
extra={
"event": "ingestion.connection.bootstrap.started",
"part": "clickup",
"connectionId": connectionId,
+ "dataSourceCount": len(dataSources),
},
)
@@ -215,30 +215,56 @@ async def bootstrapClickup(
return _finalizeResult(connectionId, result, startMs)
teams = (teamsResp or {}).get("teams") or []
- for team in teams[: limits.maxWorkspaces]:
+
+ cancelled = False
+ for ds in dataSources:
if result.indexed + result.skippedDuplicate >= limits.maxTasks:
break
- teamId = str(team.get("id", "") or "")
- if not teamId:
- continue
- result.workspaces += 1
- try:
- await _walkTeam(
- svc=svc,
- knowledgeService=knowledgeService,
- connectionId=connectionId,
- mandateId=mandateId,
- userId=userId,
- team=team,
- limits=limits,
- result=result,
- progressCb=progressCb,
- )
- except Exception as exc:
- logger.error("clickup team %s walk failed: %s", teamId, exc, exc_info=True)
- result.errors.append(f"team({teamId}): {exc}")
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ cancelled = True
+ break
- return _finalizeResult(connectionId, result, startMs)
+ dsId = ds.get("id", "")
+ dsNeutralize = ds.get("neutralize", False)
+ dsLimits = ClickupBootstrapLimits(
+ maxTasks=limits.maxTasks,
+ maxWorkspaces=limits.maxWorkspaces,
+ maxListsPerWorkspace=limits.maxListsPerWorkspace,
+ maxDescriptionChars=limits.maxDescriptionChars,
+ maxAgeDays=limits.maxAgeDays,
+ includeClosed=limits.includeClosed,
+ neutralize=dsNeutralize,
+ clickupScope=limits.clickupScope,
+ )
+
+ for team in teams[:dsLimits.maxWorkspaces]:
+ if result.indexed + result.skippedDuplicate >= dsLimits.maxTasks:
+ break
+ teamId = str(team.get("id", "") or "")
+ if not teamId:
+ continue
+ result.workspaces += 1
+ try:
+ await _walkTeam(
+ svc=svc,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ team=team,
+ limits=dsLimits,
+ result=result,
+ progressCb=progressCb,
+ dataSourceId=dsId,
+ )
+ except Exception as exc:
+ logger.error("clickup team %s walk failed: %s", teamId, exc, exc_info=True)
+ result.errors.append(f"team({teamId}): {exc}")
+
+ finalResult = _finalizeResult(connectionId, result, startMs)
+ if cancelled:
+ finalResult["cancelled"] = True
+ return finalResult
async def _resolveDependencies(connectionId: str):
@@ -280,8 +306,12 @@ async def _walkTeam(
team: Dict[str, Any],
limits: ClickupBootstrapLimits,
result: ClickupBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
+
teamId = str(team.get("id", "") or "")
spacesResp = await svc.getSpaces(teamId)
spaces = (spacesResp or {}).get("spaces") or []
@@ -294,14 +324,12 @@ async def _walkTeam(
if not spaceId:
continue
- # Folderless lists directly under the space
folderless = await svc.getFolderlessLists(spaceId)
for lst in (folderless or {}).get("lists") or []:
if len(listsCollected) >= limits.maxListsPerWorkspace:
break
listsCollected.append({**lst, "_space": space})
- # Lists inside folders
foldersResp = await svc.getFolders(spaceId)
for folder in (foldersResp or {}).get("folders") or []:
if len(listsCollected) >= limits.maxListsPerWorkspace:
@@ -318,6 +346,8 @@ async def _walkTeam(
for lst in listsCollected:
if result.indexed + result.skippedDuplicate >= limits.maxTasks:
return
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
result.lists += 1
await _walkList(
svc=svc,
@@ -330,6 +360,7 @@ async def _walkTeam(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
@@ -344,13 +375,16 @@ async def _walkList(
lst: Dict[str, Any],
limits: ClickupBootstrapLimits,
result: ClickupBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
listId = str(lst.get("id", "") or "")
if not listId:
return
page = 0
while result.indexed + result.skippedDuplicate < limits.maxTasks:
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
resp = await svc.getTasksInList(
listId,
page=page,
@@ -371,7 +405,6 @@ async def _walkList(
if not _isRecent(task.get("date_updated"), limits.maxAgeDays):
result.skippedPolicy += 1
continue
- # Inject the list/folder/space metadata we already loaded.
task["list"] = task.get("list") or {"id": listId, "name": lst.get("name")}
task["folder"] = task.get("folder") or lst.get("_folder") or {}
task["space"] = task.get("space") or lst.get("_space") or {}
@@ -385,9 +418,10 @@ async def _walkList(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
- if len(tasks) < 100: # ClickUp page-size hint: fewer than 100 => last page
+ if len(tasks) < 100:
return
page += 1
@@ -402,7 +436,8 @@ async def _ingestTask(
task: Dict[str, Any],
limits: ClickupBootstrapLimits,
result: ClickupBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -431,6 +466,7 @@ async def _ingestTask(
neutralize=limits.neutralize,
provenance={
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "clickup",
"service": "clickup",
"externalItemId": taskId,
@@ -456,8 +492,10 @@ async def _ingestTask(
else:
result.failed += 1
- if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0:
- processed = result.indexed + result.skippedDuplicate
+ processed = result.indexed + result.skippedDuplicate
+ if progressCb is not None and processed % 50 == 0:
+ if hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
try:
progressCb(
min(90, 10 + int(80 * processed / max(1, limits.maxTasks))),
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
index 5e4e659b..398b9af9 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
@@ -12,6 +12,7 @@ via export), runs the standard extraction pipeline and routes results through
from __future__ import annotations
+import asyncio
import hashlib
import logging
import time
@@ -30,7 +31,6 @@ SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/")
MAX_DEPTH_DEFAULT = 4
MAX_AGE_DAYS_DEFAULT = 365
-# Google Drive uses virtual mime-types for folders and non-downloadable assets.
FOLDER_MIME = "application/vnd.google-apps.folder"
@@ -41,12 +41,8 @@ class GdriveBootstrapLimits:
maxFileSize: int = MAX_FILE_SIZE_DEFAULT
skipMimePrefixes: tuple = SKIP_MIME_PREFIXES_DEFAULT
maxDepth: int = MAX_DEPTH_DEFAULT
- # Only ingest files modified within the last N days. None disables filter.
maxAgeDays: Optional[int] = MAX_AGE_DAYS_DEFAULT
- # Pass-through to IngestionJob.neutralize
neutralize: bool = False
- # Whether to skip binary/non-text files
- filesIndexBinaries: bool = True
@dataclass
@@ -95,10 +91,8 @@ def _isRecent(modifiedIso: Optional[str], maxAgeDays: Optional[int]) -> bool:
if not maxAgeDays:
return True
if not modifiedIso:
- # No timestamp -> be permissive (Drive native docs sometimes omit it on export).
return True
try:
- # Google returns RFC 3339 with `Z` or offset; python 3.11+ parses both.
ts = datetime.fromisoformat(modifiedIso.replace("Z", "+00:00"))
except Exception:
return True
@@ -111,34 +105,36 @@ def _isRecent(modifiedIso: Optional[str], maxAgeDays: Optional[int]) -> bool:
async def bootstrapGdrive(
connectionId: str,
*,
- progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ dataSources: Optional[List[Dict[str, Any]]] = None,
+ progressCb: Optional[Any] = None,
adapter: Any = None,
connection: Any = None,
knowledgeService: Any = None,
limits: Optional[GdriveBootstrapLimits] = None,
runExtractionFn: Optional[Callable[..., Any]] = None,
) -> Dict[str, Any]:
- """Walk My Drive starting from the virtual root folder."""
- from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs
- prefs = loadConnectionPrefs(connectionId)
+ """Walk My Drive starting from the virtual root folder.
+
+ Iterates only over explicitly provided dataSources (ragIndexEnabled=true).
+ Each DataSource defines the root path + neutralize policy for its subtree.
+ """
+ if not dataSources:
+ return {"connectionId": connectionId, "skipped": True, "reason": "no_datasources"}
if not limits:
- limits = GdriveBootstrapLimits(
- maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None,
- neutralize=prefs.neutralizeBeforeEmbed,
- filesIndexBinaries=prefs.filesIndexBinaries,
- )
+ limits = GdriveBootstrapLimits()
startMs = time.time()
result = GdriveBootstrapResult(connectionId=connectionId)
logger.info(
- "ingestion.connection.bootstrap.started part=gdrive connectionId=%s",
- connectionId,
+ "ingestion.connection.bootstrap.started part=gdrive connectionId=%s dataSources=%d",
+ connectionId, len(dataSources),
extra={
"event": "ingestion.connection.bootstrap.started",
"part": "gdrive",
"connectionId": connectionId,
+ "dataSourceCount": len(dataSources),
},
)
@@ -158,25 +154,51 @@ async def bootstrapGdrive(
mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
- try:
- await _walkFolder(
- adapter=adapter,
- knowledgeService=knowledgeService,
- runExtractionFn=runExtractionFn,
- connectionId=connectionId,
- mandateId=mandateId,
- userId=userId,
- folderPath="/", # DriveAdapter.browse maps "" / "/" -> "root"
- depth=0,
- limits=limits,
- result=result,
- progressCb=progressCb,
- )
- except Exception as exc:
- logger.error("gdrive walk failed for %s: %s", connectionId, exc, exc_info=True)
- result.errors.append(f"walk: {exc}")
+ cancelled = False
+ for ds in dataSources:
+ if result.indexed + result.skippedDuplicate >= limits.maxItems:
+ break
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ cancelled = True
+ break
- return _finalizeResult(connectionId, result, startMs)
+ dsPath = ds.get("path", "/")
+ dsId = ds.get("id", "")
+ dsNeutralize = ds.get("neutralize", False)
+ dsMaxAgeDays = ds.get("maxAgeDays", limits.maxAgeDays)
+ dsLimits = GdriveBootstrapLimits(
+ maxItems=limits.maxItems,
+ maxBytes=limits.maxBytes,
+ maxFileSize=limits.maxFileSize,
+ skipMimePrefixes=limits.skipMimePrefixes,
+ maxDepth=limits.maxDepth,
+ maxAgeDays=dsMaxAgeDays,
+ neutralize=dsNeutralize,
+ )
+
+ try:
+ await _walkFolder(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ runExtractionFn=runExtractionFn,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ folderPath=dsPath,
+ depth=0,
+ limits=dsLimits,
+ result=result,
+ progressCb=progressCb,
+ dataSourceId=dsId,
+ )
+ except Exception as exc:
+ logger.error("gdrive walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True)
+ result.errors.append(f"walk({dsPath}): {exc}")
+
+ finalResult = _finalizeResult(connectionId, result, startMs)
+ if cancelled:
+ finalResult["cancelled"] = True
+ return finalResult
async def _resolveDependencies(connectionId: str):
@@ -220,10 +242,13 @@ async def _walkFolder(
depth: int,
limits: GdriveBootstrapLimits,
result: GdriveBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
if depth > limits.maxDepth:
return
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
try:
entries = await adapter.browse(folderPath)
except Exception as exc:
@@ -236,6 +261,8 @@ async def _walkFolder(
return
if result.bytesProcessed >= limits.maxBytes:
return
+ if progressCb and hasattr(progressCb, "isCancelled") and (result.indexed + result.skippedDuplicate) % 50 == 0 and progressCb.isCancelled():
+ return
entryPath = getattr(entry, "path", "") or ""
metadata = getattr(entry, "metadata", {}) or {}
@@ -254,6 +281,7 @@ async def _walkFolder(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
continue
@@ -288,6 +316,7 @@ async def _walkFolder(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
@@ -306,7 +335,8 @@ async def _ingestOne(
revision: Optional[str],
limits: GdriveBootstrapLimits,
result: GdriveBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -321,14 +351,13 @@ async def _ingestOne(
result.errors.append(f"download({entryPath}): {exc}")
return
- # Adapter.download returns raw bytes today; guard DownloadResult shape too.
fileBytes: bytes
if isinstance(downloaded, (bytes, bytearray)):
fileBytes = bytes(downloaded)
else:
fileBytes = bytes(getattr(downloaded, "data", b"") or b"")
if getattr(downloaded, "mimeType", None):
- mimeType = downloaded.mimeType # export may have changed the type
+ mimeType = downloaded.mimeType
if not fileBytes:
result.failed += 1
return
@@ -354,6 +383,15 @@ async def _ingestOne(
result.skippedPolicy += 1
return
+ provenance: Dict[str, Any] = {
+ "connectionId": connectionId,
+ "dataSourceId": dataSourceId,
+ "authority": "google",
+ "service": "drive",
+ "externalItemId": externalItemId,
+ "entryPath": entryPath,
+ "tier": "body",
+ }
try:
handle = await knowledgeService.requestIngestion(
IngestionJob(
@@ -366,14 +404,7 @@ async def _ingestOne(
contentObjects=contentObjects,
contentVersion=revision,
neutralize=limits.neutralize,
- provenance={
- "connectionId": connectionId,
- "authority": "google",
- "service": "drive",
- "externalItemId": externalItemId,
- "entryPath": entryPath,
- "tier": "body",
- },
+ provenance=provenance,
)
)
except Exception as exc:
@@ -388,6 +419,8 @@ async def _ingestOne(
result.indexed += 1
else:
result.failed += 1
+ if handle.error:
+ result.errors.append(f"ingest({entryPath}): {handle.error}")
if progressCb is not None and (result.indexed + result.skippedDuplicate) % 50 == 0:
processed = result.indexed + result.skippedDuplicate
@@ -411,6 +444,8 @@ async def _ingestOne(
},
)
+ await asyncio.sleep(0)
+
def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: float) -> Dict[str, Any]:
durationMs = int((time.time() - startMs) * 1000)
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
index 21fec83d..f5c345c6 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
@@ -175,35 +175,36 @@ def _buildContentObjects(
async def bootstrapGmail(
connectionId: str,
*,
- progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ dataSources: Optional[List[Dict[str, Any]]] = None,
+ progressCb: Optional[Any] = None,
adapter: Any = None,
connection: Any = None,
knowledgeService: Any = None,
limits: Optional[GmailBootstrapLimits] = None,
googleGetFn: Optional[Callable[..., Any]] = None,
) -> Dict[str, Any]:
- """Enumerate Gmail labels (INBOX + SENT default) and ingest messages."""
- from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs
- prefs = loadConnectionPrefs(connectionId)
+ """Enumerate Gmail labels (INBOX + SENT default) and ingest messages.
+
+ Iterates only over explicitly provided dataSources (ragIndexEnabled=true).
+ Each DataSource defines the neutralize policy for its scope.
+ """
+ if not dataSources:
+ return {"connectionId": connectionId, "skipped": True, "reason": "no_datasources"}
if not limits:
- limits = GmailBootstrapLimits(
- includeAttachments=prefs.mailIndexAttachments,
- maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None,
- mailContentDepth=prefs.mailContentDepth,
- neutralize=prefs.neutralizeBeforeEmbed,
- )
+ limits = GmailBootstrapLimits()
startMs = time.time()
result = GmailBootstrapResult(connectionId=connectionId)
logger.info(
- "ingestion.connection.bootstrap.started part=gmail connectionId=%s",
- connectionId,
+ "ingestion.connection.bootstrap.started part=gmail connectionId=%s dataSources=%d",
+ connectionId, len(dataSources),
extra={
"event": "ingestion.connection.bootstrap.started",
"part": "gmail",
"connectionId": connectionId,
+ "dataSourceCount": len(dataSources),
},
)
@@ -221,26 +222,51 @@ async def bootstrapGmail(
mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
- for labelId in limits.labels:
+ cancelled = False
+ for ds in dataSources:
if result.indexed + result.skippedDuplicate >= limits.maxMessages:
break
- try:
- await _ingestLabel(
- googleGetFn=googleGetFn,
- knowledgeService=knowledgeService,
- connectionId=connectionId,
- mandateId=mandateId,
- userId=userId,
- labelId=labelId,
- limits=limits,
- result=result,
- progressCb=progressCb,
- )
- except Exception as exc:
- logger.error("gmail ingestion label %s failed: %s", labelId, exc, exc_info=True)
- result.errors.append(f"label({labelId}): {exc}")
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ cancelled = True
+ break
- return _finalizeResult(connectionId, result, startMs)
+ dsId = ds.get("id", "")
+ dsNeutralize = ds.get("neutralize", False)
+ dsLimits = GmailBootstrapLimits(
+ maxMessages=limits.maxMessages,
+ labels=limits.labels,
+ maxBodyChars=limits.maxBodyChars,
+ includeAttachments=limits.includeAttachments,
+ maxAttachmentBytes=limits.maxAttachmentBytes,
+ maxAgeDays=limits.maxAgeDays,
+ mailContentDepth=limits.mailContentDepth,
+ neutralize=dsNeutralize,
+ )
+
+ for labelId in dsLimits.labels:
+ if result.indexed + result.skippedDuplicate >= dsLimits.maxMessages:
+ break
+ try:
+ await _ingestLabel(
+ googleGetFn=googleGetFn,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ labelId=labelId,
+ limits=dsLimits,
+ result=result,
+ progressCb=progressCb,
+ dataSourceId=dsId,
+ )
+ except Exception as exc:
+ logger.error("gmail ingestion label %s failed: %s", labelId, exc, exc_info=True)
+ result.errors.append(f"label({labelId}): {exc}")
+
+ finalResult = _finalizeResult(connectionId, result, startMs)
+ if cancelled:
+ finalResult["cancelled"] = True
+ return finalResult
async def _resolveDependencies(connectionId: str):
@@ -282,7 +308,8 @@ async def _ingestLabel(
labelId: str,
limits: GmailBootstrapLimits,
result: GmailBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
remaining = limits.maxMessages - (result.indexed + result.skippedDuplicate)
if remaining <= 0:
@@ -316,6 +343,8 @@ async def _ingestLabel(
for stub in messageStubs:
if result.indexed + result.skippedDuplicate >= limits.maxMessages:
break
+ if progressCb and hasattr(progressCb, "isCancelled") and (result.indexed + result.skippedDuplicate) % 50 == 0 and progressCb.isCancelled():
+ return
msgId = stub.get("id")
if not msgId:
continue
@@ -337,6 +366,7 @@ async def _ingestLabel(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
nextPageToken = page.get("nextPageToken")
@@ -355,7 +385,8 @@ async def _ingestMessage(
message: Dict[str, Any],
limits: GmailBootstrapLimits,
result: GmailBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -386,6 +417,7 @@ async def _ingestMessage(
neutralize=limits.neutralize,
provenance={
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "google",
"service": "gmail",
"externalItemId": messageId,
@@ -420,6 +452,7 @@ async def _ingestMessage(
parentSyntheticId=syntheticId,
limits=limits,
result=result,
+ dataSourceId=dataSourceId,
)
except Exception as exc:
logger.warning("gmail attachments %s failed: %s", messageId, exc)
@@ -461,6 +494,7 @@ async def _ingestAttachments(
parentSyntheticId: str,
limits: GmailBootstrapLimits,
result: GmailBootstrapResult,
+ dataSourceId: str = "",
) -> None:
"""Child ingestion jobs for file attachments. Skips inline images (cid: refs)."""
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -561,6 +595,7 @@ async def _ingestAttachments(
contentObjects=contentObjects,
provenance={
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "google",
"service": "gmail",
"parentId": parentSyntheticId,
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
index 64a3545f..3f4a8afb 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
@@ -18,7 +18,7 @@ import hashlib
import logging
import time
from dataclasses import dataclass, field
-from typing import Any, Callable, Dict, List, Optional
+from typing import Any, Dict, List, Optional
from modules.serviceCenter.services.serviceKnowledge.subTextClean import cleanEmailBody
@@ -139,34 +139,35 @@ def _buildContentObjects(
async def bootstrapOutlook(
connectionId: str,
*,
- progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ dataSources: Optional[List[Dict[str, Any]]] = None,
+ progressCb: Optional[Any] = None,
adapter: Any = None,
connection: Any = None,
knowledgeService: Any = None,
limits: Optional[OutlookBootstrapLimits] = None,
) -> Dict[str, Any]:
- """Enumerate Outlook folders (inbox + sent by default) and ingest messages."""
- from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs
- prefs = loadConnectionPrefs(connectionId)
+ """Enumerate Outlook folders (inbox + sent by default) and ingest messages.
+
+ Iterates only over explicitly provided dataSources (ragIndexEnabled=true).
+ Each DataSource defines the neutralize policy for its messages.
+ """
+ if not dataSources:
+ return {"connectionId": connectionId, "skipped": True, "reason": "no_datasources"}
if not limits:
- limits = OutlookBootstrapLimits(
- includeAttachments=prefs.mailIndexAttachments,
- maxAgeDays=prefs.maxAgeDays if prefs.maxAgeDays > 0 else None,
- mailContentDepth=prefs.mailContentDepth,
- neutralize=prefs.neutralizeBeforeEmbed,
- )
+ limits = OutlookBootstrapLimits()
startMs = time.time()
result = OutlookBootstrapResult(connectionId=connectionId)
logger.info(
- "ingestion.connection.bootstrap.started part=outlook connectionId=%s",
- connectionId,
+ "ingestion.connection.bootstrap.started part=outlook connectionId=%s dataSources=%d",
+ connectionId, len(dataSources),
extra={
"event": "ingestion.connection.bootstrap.started",
"part": "outlook",
"connectionId": connectionId,
+ "dataSourceCount": len(dataSources),
},
)
@@ -176,27 +177,52 @@ async def bootstrapOutlook(
mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
- folderIds = await _selectFolderIds(adapter, limits)
- for folderId in folderIds:
+ cancelled = False
+ for ds in dataSources:
if result.indexed + result.skippedDuplicate >= limits.maxMessages:
break
- try:
- await _ingestFolder(
- adapter=adapter,
- knowledgeService=knowledgeService,
- connectionId=connectionId,
- mandateId=mandateId,
- userId=userId,
- folderId=folderId,
- limits=limits,
- result=result,
- progressCb=progressCb,
- )
- except Exception as exc:
- logger.error("outlook ingestion folder %s failed: %s", folderId, exc, exc_info=True)
- result.errors.append(f"folder({folderId}): {exc}")
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ cancelled = True
+ break
- return _finalizeResult(connectionId, result, startMs)
+ dsId = ds.get("id", "")
+ dsNeutralize = ds.get("neutralize", False)
+ dsLimits = OutlookBootstrapLimits(
+ maxMessages=limits.maxMessages,
+ maxFolders=limits.maxFolders,
+ maxBodyChars=limits.maxBodyChars,
+ includeAttachments=limits.includeAttachments,
+ maxAttachmentBytes=limits.maxAttachmentBytes,
+ maxAgeDays=limits.maxAgeDays,
+ mailContentDepth=limits.mailContentDepth,
+ neutralize=dsNeutralize,
+ )
+
+ folderIds = await _selectFolderIds(adapter, dsLimits)
+ for folderId in folderIds:
+ if result.indexed + result.skippedDuplicate >= dsLimits.maxMessages:
+ break
+ try:
+ await _ingestFolder(
+ adapter=adapter,
+ knowledgeService=knowledgeService,
+ connectionId=connectionId,
+ mandateId=mandateId,
+ userId=userId,
+ folderId=folderId,
+ limits=dsLimits,
+ result=result,
+ progressCb=progressCb,
+ dataSourceId=dsId,
+ )
+ except Exception as exc:
+ logger.error("outlook ingestion folder %s failed: %s", folderId, exc, exc_info=True)
+ result.errors.append(f"folder({folderId}): {exc}")
+
+ finalResult = _finalizeResult(connectionId, result, startMs)
+ if cancelled:
+ finalResult["cancelled"] = True
+ return finalResult
async def _resolveDependencies(connectionId: str):
@@ -266,8 +292,12 @@ async def _ingestFolder(
folderId: str,
limits: OutlookBootstrapLimits,
result: OutlookBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
+
remaining = limits.maxMessages - (result.indexed + result.skippedDuplicate)
if remaining <= 0:
return
@@ -307,6 +337,8 @@ async def _ingestFolder(
for message in page.get("value", []) or []:
if result.indexed + result.skippedDuplicate >= limits.maxMessages:
break
+ if progressCb and hasattr(progressCb, "isCancelled") and (result.indexed + result.skippedDuplicate) % 50 == 0 and progressCb.isCancelled():
+ return
await _ingestMessage(
adapter=adapter,
knowledgeService=knowledgeService,
@@ -317,6 +349,7 @@ async def _ingestFolder(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
nextLink = page.get("@odata.nextLink")
@@ -338,7 +371,8 @@ async def _ingestMessage(
message: Dict[str, Any],
limits: OutlookBootstrapLimits,
result: OutlookBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -369,6 +403,7 @@ async def _ingestMessage(
neutralize=limits.neutralize,
provenance={
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "msft",
"service": "outlook",
"externalItemId": messageId,
@@ -402,6 +437,7 @@ async def _ingestMessage(
parentSyntheticId=syntheticId,
limits=limits,
result=result,
+ dataSourceId=dataSourceId,
)
except Exception as exc:
logger.warning("outlook attachments %s failed: %s", messageId, exc)
@@ -443,6 +479,7 @@ async def _ingestAttachments(
parentSyntheticId: str,
limits: OutlookBootstrapLimits,
result: OutlookBootstrapResult,
+ dataSourceId: str = "",
) -> None:
"""Child ingestion jobs for file attachments (skip inline & oversized)."""
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -531,6 +568,7 @@ async def _ingestAttachments(
neutralize=limits.neutralize,
provenance={
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "msft",
"service": "outlook",
"parentId": parentSyntheticId,
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
index 07fef7a8..f664f1a8 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
@@ -94,35 +94,36 @@ def _toContentObjects(extracted, fileName: str) -> List[Dict[str, Any]]:
async def bootstrapSharepoint(
connectionId: str,
*,
- progressCb: Optional[Callable[[int, Optional[str]], None]] = None,
+ dataSources: Optional[List[Dict[str, Any]]] = None,
+ progressCb: Optional[Any] = None,
adapter: Any = None,
connection: Any = None,
knowledgeService: Any = None,
limits: Optional[SharepointBootstrapLimits] = None,
runExtractionFn: Optional[Callable[..., Any]] = None,
) -> Dict[str, Any]:
- """Enumerate SharePoint drives and ingest every reachable file via the façade.
+ """Enumerate SharePoint drives and ingest files via the facade.
- Parameters allow injection for tests; production callers pass only
- `connectionId` (and optionally a progressCb) and everything else is
- resolved against the registered services.
+ Iterates only over explicitly provided dataSources (ragIndexEnabled=true).
+ Each DataSource defines the root path + neutralize policy for its subtree.
"""
- from modules.serviceCenter.services.serviceKnowledge.subConnectorPrefs import loadConnectionPrefs
- prefs = loadConnectionPrefs(connectionId)
+ if not dataSources:
+ return {"connectionId": connectionId, "skipped": True, "reason": "no_datasources"}
if not limits:
- limits = SharepointBootstrapLimits(neutralize=prefs.neutralizeBeforeEmbed)
+ limits = SharepointBootstrapLimits()
startMs = time.time()
result = SharepointBootstrapResult(connectionId=connectionId)
logger.info(
- "ingestion.connection.bootstrap.started part=sharepoint connectionId=%s",
- connectionId,
+ "ingestion.connection.bootstrap.started part=sharepoint connectionId=%s dataSources=%d",
+ connectionId, len(dataSources),
extra={
"event": "ingestion.connection.bootstrap.started",
"part": "sharepoint",
"connectionId": connectionId,
+ "dataSourceCount": len(dataSources),
},
)
@@ -142,17 +143,27 @@ async def bootstrapSharepoint(
mandateId = str(getattr(connection, "mandateId", "") or "") if connection is not None else ""
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
- try:
- sites = await adapter.browse("/", limit=limits.maxSites)
- except Exception as exc:
- logger.error("sharepoint site discovery failed for %s: %s", connectionId, exc, exc_info=True)
- result.errors.append(f"site_discovery: {exc}")
- return _finalizeResult(connectionId, result, startMs)
-
- for site in sites[: limits.maxSites]:
+ cancelled = False
+ for ds in dataSources:
if result.indexed + result.skippedDuplicate >= limits.maxItems:
break
- sitePath = getattr(site, "path", "") or ""
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ cancelled = True
+ break
+
+ dsPath = ds.get("path", "")
+ dsId = ds.get("id", "")
+ dsNeutralize = ds.get("neutralize", False)
+ dsLimits = SharepointBootstrapLimits(
+ maxItems=limits.maxItems,
+ maxBytes=limits.maxBytes,
+ maxFileSize=limits.maxFileSize,
+ skipMimePrefixes=limits.skipMimePrefixes,
+ maxDepth=limits.maxDepth,
+ maxSites=limits.maxSites,
+ neutralize=dsNeutralize,
+ )
+
try:
await _walkFolder(
adapter=adapter,
@@ -161,17 +172,21 @@ async def bootstrapSharepoint(
connectionId=connectionId,
mandateId=mandateId,
userId=userId,
- folderPath=sitePath,
+ folderPath=dsPath,
depth=0,
- limits=limits,
+ limits=dsLimits,
result=result,
progressCb=progressCb,
+ dataSourceId=dsId,
)
except Exception as exc:
- logger.error("sharepoint walk failed for site %s: %s", sitePath, exc, exc_info=True)
- result.errors.append(f"walk({sitePath}): {exc}")
+ logger.error("sharepoint walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True)
+ result.errors.append(f"walk({dsPath}): {exc}")
- return _finalizeResult(connectionId, result, startMs)
+ finalResult = _finalizeResult(connectionId, result, startMs)
+ if cancelled:
+ finalResult["cancelled"] = True
+ return finalResult
async def _resolveDependencies(connectionId: str):
@@ -221,10 +236,13 @@ async def _walkFolder(
depth: int,
limits: SharepointBootstrapLimits,
result: SharepointBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
if depth > limits.maxDepth:
return
+ if progressCb and hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
+ return
try:
entries = await adapter.browse(folderPath)
except Exception as exc:
@@ -237,6 +255,8 @@ async def _walkFolder(
return
if result.bytesProcessed >= limits.maxBytes:
return
+ if progressCb and hasattr(progressCb, "isCancelled") and (result.indexed + result.skippedDuplicate) % 50 == 0 and progressCb.isCancelled():
+ return
entryPath = getattr(entry, "path", "") or ""
if getattr(entry, "isFolder", False):
@@ -252,6 +272,7 @@ async def _walkFolder(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
continue
@@ -283,6 +304,7 @@ async def _walkFolder(
limits=limits,
result=result,
progressCb=progressCb,
+ dataSourceId=dataSourceId,
)
@@ -301,7 +323,8 @@ async def _ingestOne(
revision: Optional[str],
limits: SharepointBootstrapLimits,
result: SharepointBootstrapResult,
- progressCb: Optional[Callable[[int, Optional[str]], None]],
+ progressCb: Optional[Any],
+ dataSourceId: str = "",
) -> None:
from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob
@@ -339,6 +362,7 @@ async def _ingestOne(
provenance: Dict[str, Any] = {
"connectionId": connectionId,
+ "dataSourceId": dataSourceId,
"authority": "msft",
"service": "sharepoint",
"externalItemId": externalItemId,
diff --git a/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
new file mode 100644
index 00000000..10be150d
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
@@ -0,0 +1,78 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Resolve effective policies (neutralize, ragIndexEnabled) for DataSource tree hierarchies.
+
+Tree-inheritance rule: nearest ancestor DataSource with an explicit value wins.
+If no ancestor has a value, the default (False) is used.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any, Dict, List, Optional
+
+logger = logging.getLogger(__name__)
+
+
+def resolveEffectiveNeutralize(
+ ds: Dict[str, Any],
+ allDataSources: List[Dict[str, Any]],
+) -> bool:
+ """Compute effective neutralize by walking up the path tree.
+
+ A DataSource at /sites/HR/Documents inherits from /sites/HR if
+ that ancestor has neutralize=True and the child has no explicit override.
+ """
+ ownValue = ds.get("neutralize")
+ if ownValue is not None and ownValue is not False:
+ return True
+ if ownValue is False:
+ return False
+ return _findAncestorPolicy(ds, allDataSources, "neutralize")
+
+
+def resolveEffectiveRagIndexEnabled(
+ ds: Dict[str, Any],
+ allDataSources: List[Dict[str, Any]],
+) -> bool:
+ """Compute effective ragIndexEnabled by walking up the path tree."""
+ ownValue = ds.get("ragIndexEnabled")
+ if ownValue is True:
+ return True
+ if ownValue is False:
+ return False
+ return _findAncestorPolicy(ds, allDataSources, "ragIndexEnabled")
+
+
+def _findAncestorPolicy(
+ ds: Dict[str, Any],
+ allDataSources: List[Dict[str, Any]],
+ field: str,
+) -> bool:
+ """Walk ancestors (longest-prefix match) to find an inherited policy value."""
+ dsPath = ds.get("path", "")
+ connectionId = ds.get("connectionId", "")
+ if not dsPath:
+ return False
+
+ ancestors = []
+ for candidate in allDataSources:
+ if candidate.get("id") == ds.get("id"):
+ continue
+ if candidate.get("connectionId") != connectionId:
+ continue
+ candidatePath = candidate.get("path", "")
+ if not candidatePath:
+ continue
+ if dsPath.startswith(candidatePath) and len(candidatePath) < len(dsPath):
+ ancestors.append(candidate)
+
+ ancestors.sort(key=lambda a: len(a.get("path", "")), reverse=True)
+
+ for ancestor in ancestors:
+ val = ancestor.get(field)
+ if val is True:
+ return True
+ if val is False:
+ return False
+ return False
diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py
index b7e45006..21d0cbee 100644
--- a/modules/system/mainSystem.py
+++ b/modules/system/mainSystem.py
@@ -144,6 +144,14 @@ NAVIGATION_SECTIONS = [
"path": "/automations",
"order": 30,
},
+ {
+ "id": "rag-inventory",
+ "objectKey": "ui.system.ragInventory",
+ "label": t("RAG-Inventar"),
+ "icon": "FaDatabase",
+ "path": "/rag-inventory",
+ "order": 35,
+ },
{
"id": "store",
"objectKey": "ui.system.store",
diff --git a/scripts/script_db_migrate_datasource_rag.py b/scripts/script_db_migrate_datasource_rag.py
new file mode 100644
index 00000000..95c2ae35
--- /dev/null
+++ b/scripts/script_db_migrate_datasource_rag.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+"""Migration: Rename DataSource.autoSync -> ragIndexEnabled, lastSynced -> lastIndexed.
+
+This is a one-off migration for the RAG consent & control unification.
+Safe to run multiple times (checks column existence before acting).
+
+Usage:
+ python script_db_migrate_datasource_rag.py [--dry-run]
+"""
+
+import os
+import sys
+import argparse
+import logging
+from pathlib import Path
+
+scriptPath = Path(__file__).resolve()
+gatewayPath = scriptPath.parent.parent
+sys.path.insert(0, str(gatewayPath))
+os.chdir(str(gatewayPath))
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
+logger = logging.getLogger(__name__)
+
+import psycopg2
+from modules.shared.configuration import APP_CONFIG
+
+
+def _getConnection():
+ return psycopg2.connect(
+ host=APP_CONFIG.get("DB_HOST", "localhost"),
+ port=int(APP_CONFIG.get("DB_PORT", "5432")),
+ database=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ user=APP_CONFIG.get("DB_USER"),
+ password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _columnExists(cur, table: str, column: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""",
+ (table, column),
+ )
+ return cur.fetchone() is not None
+
+
+def migrate(dryRun: bool = False):
+ conn = _getConnection()
+ conn.autocommit = False
+ cur = conn.cursor()
+
+ renames = [
+ ("DataSource", "autoSync", "ragIndexEnabled"),
+ ("DataSource", "lastSynced", "lastIndexed"),
+ ]
+
+ executed = []
+ for table, oldCol, newCol in renames:
+ if _columnExists(cur, table, oldCol) and not _columnExists(cur, table, newCol):
+ sql = f'ALTER TABLE public."{table}" RENAME COLUMN "{oldCol}" TO "{newCol}";'
+ logger.info("EXEC: %s", sql)
+ if not dryRun:
+ cur.execute(sql)
+ executed.append(sql)
+ elif _columnExists(cur, table, newCol):
+ logger.info("SKIP: %s.%s already exists (migration already applied)", table, newCol)
+ elif not _columnExists(cur, table, oldCol):
+ logger.warning("SKIP: %s.%s does not exist (table schema may differ)", table, oldCol)
+
+ if not dryRun and executed:
+ conn.commit()
+ logger.info("Migration committed (%d statements)", len(executed))
+ elif dryRun and executed:
+ conn.rollback()
+ logger.info("DRY RUN — would execute %d statements", len(executed))
+ else:
+ logger.info("Nothing to do — schema already up to date")
+
+ cur.close()
+ conn.close()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing")
+ args = parser.parse_args()
+ migrate(dryRun=args.dry_run)
diff --git a/tests/unit/services/test_bootstrap_clickup.py b/tests/unit/services/test_bootstrap_clickup.py
index 87c08c3d..4ed0c4f1 100644
--- a/tests/unit/services/test_bootstrap_clickup.py
+++ b/tests/unit/services/test_bootstrap_clickup.py
@@ -100,6 +100,9 @@ def _adapter(svc):
return SimpleNamespace(_svc=svc)
+_DEFAULT_DS = [{"id": "ds-1", "neutralize": False}]
+
+
def test_bootstrap_walks_team_space_lists_and_tasks():
svc = _FakeClickupService(taskCount=2)
knowledge = _FakeKnowledgeService()
@@ -108,6 +111,7 @@ def test_bootstrap_walks_team_space_lists_and_tasks():
async def _run():
return await bootstrapClickup(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
@@ -126,10 +130,10 @@ def test_bootstrap_walks_team_space_lists_and_tasks():
assert job.mimeType == "application/vnd.clickup.task+json"
assert job.mandateId == "m1"
assert job.provenance["connectionId"] == "c1"
+ assert job.provenance["dataSourceId"] == "ds-1"
assert job.provenance["authority"] == "clickup"
assert job.provenance["teamId"] == "team-1"
assert job.contentVersion # numeric millisecond string
- # At least the header content-object is present.
ids = [co["contentObjectId"] for co in job.contentObjects]
assert "header" in ids
@@ -146,6 +150,7 @@ def test_bootstrap_reports_duplicates_on_second_run():
async def _run():
return await bootstrapClickup(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
@@ -165,6 +170,7 @@ def test_bootstrap_skips_tasks_older_than_maxAgeDays():
async def _run():
return await bootstrapClickup(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
@@ -185,6 +191,7 @@ def test_bootstrap_maxTasks_caps_ingestion():
async def _run():
return await bootstrapClickup(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=_adapter(svc),
connection=connection,
knowledgeService=knowledge,
@@ -195,9 +202,41 @@ def test_bootstrap_maxTasks_caps_ingestion():
assert result["indexed"] == 3
+def test_bootstrap_skips_when_no_datasources():
+ async def _run():
+ return await bootstrapClickup(connectionId="c1")
+
+ result = asyncio.run(_run())
+ assert result["skipped"] is True
+ assert result["reason"] == "no_datasources"
+
+
+def test_bootstrap_honours_datasource_neutralize():
+ svc = _FakeClickupService(taskCount=1)
+ knowledge = _FakeKnowledgeService()
+ connection = SimpleNamespace(mandateId="m1", userId="u1")
+
+ async def _run():
+ return await bootstrapClickup(
+ connectionId="c1",
+ dataSources=[{"id": "ds-n", "neutralize": True}],
+ adapter=_adapter(svc),
+ connection=connection,
+ knowledgeService=knowledge,
+ limits=ClickupBootstrapLimits(maxAgeDays=None),
+ )
+
+ asyncio.run(_run())
+ for job in knowledge.calls:
+ assert job.neutralize is True
+ assert job.provenance["dataSourceId"] == "ds-n"
+
+
if __name__ == "__main__":
test_bootstrap_walks_team_space_lists_and_tasks()
test_bootstrap_reports_duplicates_on_second_run()
test_bootstrap_skips_tasks_older_than_maxAgeDays()
test_bootstrap_maxTasks_caps_ingestion()
+ test_bootstrap_skips_when_no_datasources()
+ test_bootstrap_honours_datasource_neutralize()
print("OK — bootstrapClickup tests passed")
diff --git a/tests/unit/services/test_bootstrap_gdrive.py b/tests/unit/services/test_bootstrap_gdrive.py
index 1b88677e..2741332f 100644
--- a/tests/unit/services/test_bootstrap_gdrive.py
+++ b/tests/unit/services/test_bootstrap_gdrive.py
@@ -119,6 +119,9 @@ def _fakeRunExtraction(data, name, mime, options):
)
+_DEFAULT_DS = [{"id": "ds1", "path": "/", "neutralize": False}]
+
+
def test_bootstrap_walks_drive_and_subfolders():
adapter = _FakeDriveAdapter()
knowledge = _FakeKnowledgeService()
@@ -127,6 +130,7 @@ def test_bootstrap_walks_drive_and_subfolders():
async def _run():
return await bootstrapGdrive(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
@@ -160,6 +164,7 @@ def test_bootstrap_reports_duplicates_on_second_run():
async def _run():
return await bootstrapGdrive(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
@@ -180,11 +185,11 @@ def test_bootstrap_skips_files_older_than_maxAgeDays():
async def _run():
return await bootstrapGdrive(
connectionId="c1",
+ dataSources=[{"id": "ds1", "path": "/", "neutralize": False, "maxAgeDays": 180}],
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
runExtractionFn=_fakeRunExtraction,
- limits=GdriveBootstrapLimits(maxAgeDays=180),
)
result = asyncio.run(_run())
@@ -200,6 +205,7 @@ def test_bootstrap_passes_connection_provenance():
async def _run():
return await bootstrapGdrive(
connectionId="c1",
+ dataSources=_DEFAULT_DS,
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
@@ -212,14 +218,25 @@ def test_bootstrap_passes_connection_provenance():
assert job.sourceKind == "gdrive_item"
assert job.mandateId == "m1"
assert job.provenance["connectionId"] == "c1"
+ assert job.provenance["dataSourceId"] == "ds1"
assert job.provenance["authority"] == "google"
assert job.provenance["service"] == "drive"
assert job.contentVersion # modifiedTime ISO string
+def test_bootstrap_skips_when_no_datasources():
+ async def _run():
+ return await bootstrapGdrive(connectionId="c1")
+
+ result = asyncio.run(_run())
+ assert result["skipped"] is True
+ assert result["reason"] == "no_datasources"
+
+
if __name__ == "__main__":
test_bootstrap_walks_drive_and_subfolders()
test_bootstrap_reports_duplicates_on_second_run()
test_bootstrap_skips_files_older_than_maxAgeDays()
test_bootstrap_passes_connection_provenance()
+ test_bootstrap_skips_when_no_datasources()
print("OK — bootstrapGdrive tests passed")
diff --git a/tests/unit/services/test_bootstrap_outlook.py b/tests/unit/services/test_bootstrap_outlook.py
index 26664eaa..c5fea524 100644
--- a/tests/unit/services/test_bootstrap_outlook.py
+++ b/tests/unit/services/test_bootstrap_outlook.py
@@ -111,6 +111,7 @@ def test_bootstrap_outlook_indexes_messages_from_inbox_and_sent():
async def _run():
return await bootstrapOutlook(
connectionId="c1",
+ dataSources=[{"id": "ds1", "neutralize": False}],
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
@@ -129,6 +130,7 @@ def test_bootstrap_outlook_indexes_messages_from_inbox_and_sent():
assert job.sourceKind == "outlook_message"
assert job.mimeType == "message/rfc822"
assert job.provenance["connectionId"] == "c1"
+ assert job.provenance["dataSourceId"] == "ds1"
assert job.provenance["service"] == "outlook"
assert job.contentVersion == "ck1"
assert any(co["contentObjectId"] == "header" for co in job.contentObjects)
@@ -146,6 +148,7 @@ def test_bootstrap_outlook_follows_pagination():
async def _run():
return await bootstrapOutlook(
connectionId="c1",
+ dataSources=[{"id": "ds1", "neutralize": False}],
adapter=adapter,
connection=connection,
knowledgeService=knowledge,
@@ -171,6 +174,7 @@ def test_bootstrap_outlook_reports_duplicates():
async def _run():
return await bootstrapOutlook(
connectionId="c1",
+ dataSources=[{"id": "ds1", "neutralize": False}],
adapter=adapter,
connection=connection,
knowledgeService=knowledge,