From 4064ac0266b67e77625a0726fec1f918f7ac1c88 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 18 May 2026 07:56:53 +0200
Subject: [PATCH] fixed toggle icons udb
---
app.py | 3 +
modules/datamodels/datamodelBackgroundJob.py | 11 +
modules/datamodels/datamodelDataSource.py | 41 ++-
.../datamodels/datamodelFeatureDataSource.py | 29 +-
.../trustee/accounting/accountingDataSync.py | 9 +-
modules/features/trustee/mainTrustee.py | 21 ++
.../features/trustee/routeFeatureTrustee.py | 24 +-
.../workspace/routeFeatureWorkspace.py | 1 +
modules/routes/routeDataSources.py | 267 ++++++++++++--
modules/routes/routeJobs.py | 18 +-
modules/routes/routeRagInventory.py | 73 +++-
.../mainBackgroundJobService.py | 40 +-
.../services/serviceChat/mainServiceChat.py | 10 +-
.../serviceKnowledge/_costEstimate.py | 86 +++++
.../serviceKnowledge/_inheritFlags.py | 342 ++++++++++++++++++
.../serviceKnowledge/_progressMessages.py | 23 ++
.../services/serviceKnowledge/_ragLimits.py | 107 ++++++
.../subConnectorIngestConsumer.py | 43 ++-
.../subConnectorSyncClickup.py | 27 +-
.../subConnectorSyncGdrive.py | 31 +-
.../serviceKnowledge/subConnectorSyncGmail.py | 6 +-
.../subConnectorSyncKdrive.py | 31 +-
.../subConnectorSyncOutlook.py | 6 +-
.../subConnectorSyncSharepoint.py | 36 +-
.../serviceKnowledge/subPolicyResolver.py | 70 +---
modules/shared/i18nRegistry.py | 42 +++
scripts/debug_rag_job_result.py | 70 ++++
..._db_migrate_backgroundjob_progress_data.py | 97 +++++
.../script_db_migrate_datasource_inherit.py | 110 ++++++
.../script_db_migrate_datasource_settings.py | 102 ++++++
tests/unit/services/test_costEstimate.py | 55 +++
tests/unit/services/test_inheritFlags.py | 330 +++++++++++++++++
.../test_knowledge_ingest_consumer.py | 39 +-
tests/unit/services/test_ragLimits.py | 79 ++++
34 files changed, 2107 insertions(+), 172 deletions(-)
create mode 100644 modules/serviceCenter/services/serviceKnowledge/_costEstimate.py
create mode 100644 modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py
create mode 100644 modules/serviceCenter/services/serviceKnowledge/_progressMessages.py
create mode 100644 modules/serviceCenter/services/serviceKnowledge/_ragLimits.py
create mode 100644 scripts/debug_rag_job_result.py
create mode 100644 scripts/script_db_migrate_backgroundjob_progress_data.py
create mode 100644 scripts/script_db_migrate_datasource_inherit.py
create mode 100644 scripts/script_db_migrate_datasource_settings.py
create mode 100644 tests/unit/services/test_costEstimate.py
create mode 100644 tests/unit/services/test_inheritFlags.py
create mode 100644 tests/unit/services/test_ragLimits.py
diff --git a/app.py b/app.py
index d94c7dd5..93cc8b79 100644
--- a/app.py
+++ b/app.py
@@ -418,6 +418,9 @@ async def lifespan(app: FastAPI):
registerKnowledgeIngestionConsumer,
)
registerKnowledgeIngestionConsumer()
+ # Side-effect import: registers all walker progress message keys
+ # in the i18n registry so `syncRegistryToDb` picks them up.
+ from modules.serviceCenter.services.serviceKnowledge import _progressMessages # noqa: F401
except Exception as e:
logger.warning(f"KnowledgeIngestionConsumer registration failed (non-critical): {e}")
diff --git a/modules/datamodels/datamodelBackgroundJob.py b/modules/datamodels/datamodelBackgroundJob.py
index fa99ea34..809fb994 100644
--- a/modules/datamodels/datamodelBackgroundJob.py
+++ b/modules/datamodels/datamodelBackgroundJob.py
@@ -96,6 +96,17 @@ class BackgroundJob(PowerOnModel):
description="Human-readable current step (e.g. 'Importing journal entries...')",
json_schema_extra={"label": "Fortschritts-Nachricht"},
)
+ progressMessageData: Optional[Dict[str, Any]] = Field(
+ None,
+ description=(
+ "Structured i18n payload for `progressMessage`. Shape: "
+ "{'key': '', 'params': {...}}. "
+ "Frontend renders via `t(key, params)`; older clients fall back "
+ "to `progressMessage`. Single source of truth — keep `progressMessage` "
+ "as the rendered fallback in the producing language."
+ ),
+ json_schema_extra={"label": "Fortschritts-Nachricht (i18n)"},
+ )
payload: Dict[str, Any] = Field(
default_factory=dict,
diff --git a/modules/datamodels/datamodelDataSource.py b/modules/datamodels/datamodelDataSource.py
index fe3f0442..de32bdf3 100644
--- a/modules/datamodels/datamodelDataSource.py
+++ b/modules/datamodels/datamodelDataSource.py
@@ -62,9 +62,14 @@ class DataSource(PowerOnModel):
description="Owner user ID",
json_schema_extra={"label": "Benutzer-ID", "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"}},
)
- ragIndexEnabled: bool = Field(
- default=False,
- description="When true this tree element is indexed into the RAG knowledge store",
+ ragIndexEnabled: Optional[bool] = Field(
+ default=None,
+ description=(
+ "Three-state RAG indexing flag with cascade-inherit semantics. "
+ "None = inherit from nearest ancestor DataSource (path-traversal); "
+ "True/False = explicit override that propagates to descendants. "
+ "Walker computes effective value via getEffectiveFlag()."
+ ),
json_schema_extra={"label": "Im RAG indexieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False},
)
lastIndexed: Optional[float] = Field(
@@ -72,9 +77,13 @@ class DataSource(PowerOnModel):
description="Timestamp of last successful RAG indexing run",
json_schema_extra={"label": "Letzte Indexierung", "frontend_type": "timestamp"},
)
- scope: str = Field(
- default="personal",
- description="Data visibility scope: personal, featureInstance, mandate, global",
+ scope: Optional[str] = Field(
+ default=None,
+ description=(
+ "Data visibility scope with inherit semantics. "
+ "None = inherit; values: personal, featureInstance, mandate, global. "
+ "Cascade-reset on parent toggle."
+ ),
json_schema_extra={"label": "Sichtbarkeit", "frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [
{"value": "personal", "label": "Persönlich"},
{"value": "featureInstance", "label": "Feature-Instanz"},
@@ -82,11 +91,25 @@ class DataSource(PowerOnModel):
{"value": "global", "label": "Global"},
]},
)
- neutralize: bool = Field(
- default=False,
- description="Whether this data source should be neutralized before AI processing",
+ neutralize: Optional[bool] = Field(
+ default=None,
+ description=(
+ "Three-state neutralization flag with cascade-inherit semantics. "
+ "None = inherit from nearest ancestor DataSource (path-traversal); "
+ "True/False = explicit override that propagates to descendants."
+ ),
json_schema_extra={"label": "Neutralisieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False},
)
+ settings: Optional[Dict[str, Any]] = Field(
+ default=None,
+ description=(
+ "DataSource-scoped settings (JSON). Currently used keys: "
+ "ragLimits.{maxBytes,maxFileSize,maxItems,maxDepth}. "
+ "Walker reads these directly; missing keys fall back to RAG_LIMITS_DEFAULT "
+ "and are lazily persisted on next bootstrap."
+ ),
+ json_schema_extra={"label": "Einstellungen", "frontend_type": "json", "frontend_readonly": True, "frontend_required": False},
+ )
class ExternalEntry(BaseModel):
diff --git a/modules/datamodels/datamodelFeatureDataSource.py b/modules/datamodels/datamodelFeatureDataSource.py
index dd2c4035..f07a8bda 100644
--- a/modules/datamodels/datamodelFeatureDataSource.py
+++ b/modules/datamodels/datamodelFeatureDataSource.py
@@ -6,7 +6,7 @@ A FeatureDataSource links a FeatureInstance table (DATA_OBJECT) to a workspace
so the agent can query structured feature data (e.g. TrusteePosition rows).
"""
-from typing import Dict, List, Optional
+from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from modules.datamodels.datamodelBase import PowerOnModel
from modules.shared.i18nRegistry import i18nModel
@@ -55,9 +55,12 @@ class FeatureDataSource(PowerOnModel):
description="Workspace feature instance where this source is used",
json_schema_extra={"label": "Workspace", "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label"}},
)
- scope: str = Field(
- default="personal",
- description="Data visibility scope: personal, featureInstance, mandate, global",
+ scope: Optional[str] = Field(
+ default=None,
+ description=(
+ "Data visibility scope with inherit semantics. "
+ "None = inherit; values: personal, featureInstance, mandate, global."
+ ),
json_schema_extra={"label": "Sichtbarkeit", "frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [
{"value": "personal", "label": "Persönlich"},
{"value": "featureInstance", "label": "Feature-Instanz"},
@@ -65,9 +68,12 @@ class FeatureDataSource(PowerOnModel):
{"value": "global", "label": "Global"},
]},
)
- neutralize: bool = Field(
- default=False,
- description="Whether this data source should be neutralized before AI processing",
+ neutralize: Optional[bool] = Field(
+ default=None,
+ description=(
+ "Three-state neutralization flag with cascade-inherit semantics. "
+ "None = inherit; True/False = explicit. Cascade-reset on parent toggle."
+ ),
json_schema_extra={"label": "Neutralisieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False},
)
neutralizeFields: Optional[List[str]] = Field(
@@ -80,3 +86,12 @@ class FeatureDataSource(PowerOnModel):
description="Record-level filter applied when querying this table, e.g. {'sessionId': 'abc-123'}",
json_schema_extra={"label": "Datensatzfilter"},
)
+ settings: Optional[Dict[str, Any]] = Field(
+ default=None,
+ description=(
+ "FeatureDataSource-scoped settings (JSON). Currently used keys: "
+ "ragLimits.{maxBytes,maxFileSize,maxItems,maxDepth}. "
+ "Mirror of DataSource.settings so the UDB settings modal can target both."
+ ),
+ json_schema_extra={"label": "Einstellungen", "frontend_type": "json", "frontend_readonly": True, "frontend_required": False},
+ )
diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py
index 5827dd11..db50d657 100644
--- a/modules/features/trustee/accounting/accountingDataSync.py
+++ b/modules/features/trustee/accounting/accountingDataSync.py
@@ -205,11 +205,16 @@ class AccountingDataSync:
boundary so the UI poll on ``GET /api/jobs/{jobId}`` shows real
movement instead of jumping from 10 % to 100 %. Safe to omit.
"""
- def _progress(pct: int, msg: str) -> None:
+ def _progress(pct: int, msgKey: str, msgParams: Optional[Dict[str, Any]] = None) -> None:
+ """Forward to progressCb using the i18n contract.
+
+ `msgKey` is the German plaintext-as-key; the frontend translates
+ it via `t(key, params)` when rendering.
+ """
if progressCb is None:
return
try:
- progressCb(pct, msg)
+ progressCb(pct, messageKey=msgKey, messageParams=msgParams or {})
except Exception as ex:
logger.warning(f"progressCb failed at {pct}%: {ex}")
from modules.features.trustee.datamodelFeatureTrustee import (
diff --git a/modules/features/trustee/mainTrustee.py b/modules/features/trustee/mainTrustee.py
index 8f725d2f..b3f7cdcf 100644
--- a/modules/features/trustee/mainTrustee.py
+++ b/modules/features/trustee/mainTrustee.py
@@ -12,6 +12,27 @@ from modules.shared.i18nRegistry import t
logger = logging.getLogger(__name__)
+# i18n: register BackgroundJob progress message keys used by routeFeatureTrustee /
+# accountingDataSync. Walker call sites use `progressCb(..., messageKey="…")`
+# without going through `t()`, so we must register each key here as a
+# string-literal `t(...)` call -- per i18n convention `t()` MUST receive a
+# literal so static scanners and the boot-time `syncRegistryToDb` can pick
+# it up. Do NOT collapse these into a loop over a list of variables.
+t("Sync wird vorbereitet ({total} Position(en))...")
+t("Verbindungsaufbau fehlgeschlagen.")
+t("Keine aktive Buchhaltungs-Konfiguration gefunden.")
+t("Position {index}/{total} verarbeitet")
+t("Sync abgeschlossen.")
+t("Initialisiere Import...")
+t("Verbinde mit Buchhaltungssystem...")
+t("Import abgeschlossen.")
+t("Lade Kontenplan...")
+t("Lade Journaleintraege vom Buchhaltungssystem...")
+t("Lade Kunden...")
+t("Lade Lieferanten...")
+t("Lade Kontensaldi vom Buchhaltungssystem...")
+t("Speichere Kontensaldi...")
+
# Feature metadata
FEATURE_CODE = "trustee"
FEATURE_LABEL = t("Treuhand", context="UI")
diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py
index 2c9c3328..a71b508f 100644
--- a/modules/features/trustee/routeFeatureTrustee.py
+++ b/modules/features/trustee/routeFeatureTrustee.py
@@ -1644,7 +1644,11 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D
results = []
total = len(positionIds)
- progressCb(2, f"Sync wird vorbereitet ({total} Position(en))...")
+ progressCb(
+ 2,
+ messageKey="Sync wird vorbereitet ({total} Position(en))...",
+ messageParams={"total": total},
+ )
# Resolve connector + plain config once to avoid decryption rate-limits
# (mirrors the optimisation in pushBatchToAccounting). We push positions
@@ -1655,12 +1659,12 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D
connector, plainConfig, configRecord = await bridge._resolveConnectorAndConfig(instanceId)
except Exception as resolveErr:
logger.exception("Accounting push: failed to resolve connector/config")
- progressCb(100, "Verbindungsaufbau fehlgeschlagen.")
+ progressCb(100, messageKey="Verbindungsaufbau fehlgeschlagen.")
raise resolveErr
if not connector or not plainConfig:
results = [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds]
- progressCb(100, "Keine aktive Buchhaltungs-Konfiguration gefunden.")
+ progressCb(100, messageKey="Keine aktive Buchhaltungs-Konfiguration gefunden.")
return {
"total": len(results),
"success": 0,
@@ -1680,7 +1684,11 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D
results.append(result)
# Reserve 5..95% for the push loop, keep the tail for summary.
pct = 5 + int(90 * index / total)
- progressCb(pct, f"Position {index}/{total} verarbeitet")
+ progressCb(
+ pct,
+ messageKey="Position {index}/{total} verarbeitet",
+ messageParams={"index": index, "total": total},
+ )
skipped = [r for r in results if not r.success and r.errorMessage and "already synced" in r.errorMessage]
failed = [r for r in results if not r.success and r not in skipped]
@@ -1693,7 +1701,7 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D
"; ".join(r.errorMessage or "unknown" for r in failed[:3]),
)
- progressCb(100, "Sync abgeschlossen.")
+ progressCb(100, messageKey="Sync abgeschlossen.")
return {
"total": len(results),
"success": sum(1 for r in results if r.success),
@@ -1823,10 +1831,10 @@ async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> D
payload = job.get("payload") or {}
rootUser = getRootUser()
- progressCb(5, "Initialisiere Import...")
+ progressCb(5, messageKey="Initialisiere Import...")
interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId)
sync = AccountingDataSync(interface)
- progressCb(10, "Verbinde mit Buchhaltungssystem...")
+ progressCb(10, messageKey="Verbinde mit Buchhaltungssystem...")
result = await sync.importData(
featureInstanceId=instanceId,
mandateId=mandateId,
@@ -1834,7 +1842,7 @@ async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> D
dateTo=payload.get("dateTo"),
progressCb=progressCb,
)
- progressCb(100, "Import abgeschlossen.")
+ progressCb(100, messageKey="Import abgeschlossen.")
return result
diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py
index 4487e5fe..2fa788e8 100644
--- a/modules/features/workspace/routeFeatureWorkspace.py
+++ b/modules/features/workspace/routeFeatureWorkspace.py
@@ -1324,6 +1324,7 @@ async def listWorkspaceConnections(
"externalUsername": conn.get("externalUsername"),
"externalEmail": conn.get("externalEmail"),
"status": status,
+ "knowledgeIngestionEnabled": bool(conn.get("knowledgeIngestionEnabled")),
})
return JSONResponse({"connections": items})
diff --git a/modules/routes/routeDataSources.py b/modules/routes/routeDataSources.py
index ba398008..5dec19c8 100644
--- a/modules/routes/routeDataSources.py
+++ b/modules/routes/routeDataSources.py
@@ -9,11 +9,40 @@ from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelDataSource import DataSource
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
+from modules.datamodels.datamodelUam import UserConnection
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeDataSources")
logger = logging.getLogger(__name__)
+
+def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None:
+ """Forward-only sync: if a DataSource gets RAG-activated, ensure the parent
+ UserConnection.knowledgeIngestionEnabled is true.
+
+ Intentionally NOT bidirectional: disabling the last DataSource does NOT
+ auto-clear knowledgeIngestionEnabled, because the consent flag may have
+ been set explicitly via the Connections page / wizard even before any
+ DataSource exists. Only the master switch (`/knowledge-consent`) may
+ clear it.
+ """
+ if not connectionId:
+ return
+ try:
+ currentConn = rootIf.db.getRecord(UserConnection, connectionId)
+ if not currentConn:
+ return
+ if bool(currentConn.get("knowledgeIngestionEnabled")):
+ return
+ rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True})
+ logger.info(
+ "Auto-enabled knowledgeIngestionEnabled on UserConnection %s "
+ "(triggered by first active DataSource).",
+ connectionId,
+ )
+ except Exception as e:
+ logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e)
+
router = APIRouter(
prefix="/api/datasources",
tags=["Data Sources"],
@@ -45,26 +74,43 @@ def _findSourceRecord(db, sourceId: str):
def _updateDataSourceScope(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
- scope: str = Body(..., embed=True),
+ scope: Optional[str] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
- """Update the scope of a DataSource or FeatureDataSource. Global scope requires sysAdmin."""
- if scope not in _VALID_SCOPES:
- raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}")
+ """Update the scope of a DataSource. Cascade-resets explicit descendants.
- if scope == "global" and not context.isSysAdmin:
- raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope"))
+ `scope=None` resets this node to inherit (no cascade). Global scope
+ requires sysAdmin.
+ """
+ if scope is not None:
+ if scope not in _VALID_SCOPES:
+ raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}")
+ if scope == "global" and not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope"))
try:
from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
+ cascadeResetDescendants,
+ cascadeResetDescendantsFds,
+ )
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"scope": scope})
- logger.info("Updated scope=%s for %s %s", scope, model.__name__, sourceId)
- return {"sourceId": sourceId, "scope": scope, "updated": True}
+ cascaded = 0
+ if scope is not None:
+ if model is DataSource:
+ cascaded = cascadeResetDescendants(rootIf, rec, "scope")
+ else:
+ cascaded = cascadeResetDescendantsFds(rootIf, rec, "scope")
+ logger.info(
+ "Updated scope=%s for %s %s (cascade-reset %d descendants)",
+ scope, model.__name__, sourceId, cascaded,
+ )
+ return {"sourceId": sourceId, "scope": scope, "updated": True, "cascadedDescendants": cascaded}
except HTTPException:
raise
except Exception as e:
@@ -77,20 +123,36 @@ def _updateDataSourceScope(
def _updateDataSourceNeutralize(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
- neutralize: bool = Body(..., embed=True),
+ neutralize: Optional[bool] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
- """Toggle the neutralization flag on a DataSource or FeatureDataSource."""
+ """Set neutralize flag on a DataSource. Cascade-resets explicit descendants.
+
+ `neutralize=None` resets this node to inherit (no cascade).
+ """
try:
from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
+ cascadeResetDescendants,
+ cascadeResetDescendantsFds,
+ )
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize})
- logger.info("Updated neutralize=%s for %s %s", neutralize, model.__name__, sourceId)
- return {"sourceId": sourceId, "neutralize": neutralize, "updated": True}
+ cascaded = 0
+ if neutralize is not None:
+ if model is DataSource:
+ cascaded = cascadeResetDescendants(rootIf, rec, "neutralize")
+ else:
+ cascaded = cascadeResetDescendantsFds(rootIf, rec, "neutralize")
+ logger.info(
+ "Updated neutralize=%s for %s %s (cascade-reset %d descendants)",
+ neutralize, model.__name__, sourceId, cascaded,
+ )
+ return {"sourceId": sourceId, "neutralize": neutralize, "updated": True, "cascadedDescendants": cascaded}
except HTTPException:
raise
except Exception as e:
@@ -132,13 +194,14 @@ def _updateNeutralizeFields(
async def _updateDataSourceRagIndex(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource"),
- ragIndexEnabled: bool = Body(..., embed=True),
+ ragIndexEnabled: Optional[bool] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
- """Toggle RAG indexing for a DataSource.
+ """Set RAG indexing flag on a DataSource. Cascade-resets explicit descendants.
- true: sets flag + enqueues mini-bootstrap for this DataSource only.
- false: sets flag + synchronously purges all chunks from this DataSource.
+ `ragIndexEnabled=None` resets this node to inherit (no cascade, no purge,
+ no bootstrap — the node simply follows its ancestor chain afterwards).
+ `True` enqueues a mini-bootstrap. `False` synchronously purges chunks.
Must be `async def` so `await startJob(...)` registers `_runJob` in the
main event loop. Sync route → worker thread → temporary loop closes
@@ -146,18 +209,26 @@ async def _updateDataSourceRagIndex(
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.serviceCenter.services.serviceKnowledge._inheritFlags import cascadeResetDescendants
rootIf = getRootInterface()
rec = rootIf.db.getRecord(DataSource, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(DataSource, sourceId, {"ragIndexEnabled": ragIndexEnabled})
- logger.info("Updated ragIndexEnabled=%s for DataSource %s", ragIndexEnabled, sourceId)
+ cascaded = 0
+ if ragIndexEnabled is not None:
+ cascaded = cascadeResetDescendants(rootIf, rec, "ragIndexEnabled")
+ logger.info(
+ "Updated ragIndexEnabled=%s for DataSource %s (cascade-reset %d descendants)",
+ ragIndexEnabled, sourceId, cascaded,
+ )
- if ragIndexEnabled:
+ connectionId = rec.get("connectionId") or rec.get("connection_id") or ""
+ if ragIndexEnabled is True:
+ _ensureConnectionKnowledgeFlag(rootIf, connectionId)
from modules.serviceCenter.services.serviceBackgroundJobs import startJob
- connectionId = rec.get("connectionId") or rec.get("connection_id") or ""
conn = rootIf.getUserConnectionById(connectionId) if connectionId else None
authority = ""
if conn:
@@ -168,7 +239,7 @@ async def _updateDataSourceRagIndex(
{"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": [sourceId]},
triggeredBy=str(context.user.id),
)
- else:
+ elif ragIndexEnabled is False:
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
purgeResult = getKnowledgeInterface(None).deleteFileContentIndexByDataSource(sourceId)
logger.info("Purged %d index rows / %d chunks for DataSource %s",
@@ -182,12 +253,164 @@ async def _updateDataSourceRagIndex(
mandateId=context.mandateId,
category=AuditCategory.PERMISSION.value,
action="rag_index_toggled",
- details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled}),
+ details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "cascadedDescendants": cascaded}),
)
- return {"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "updated": True}
+ return {"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "updated": True, "cascadedDescendants": cascaded}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource ragIndexEnabled: %s", e)
raise HTTPException(status_code=500, detail=str(e))
+
+
+_CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"}
+_ALLOWED_RAG_LIMIT_KEYS = {
+ "files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"},
+ "clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"},
+}
+
+
+def _kindForSource(rec: Dict[str, Any], model) -> str:
+ """Map a DataSource record to a RAG-limits kind ('files' or 'clickup').
+
+ FeatureDataSource (tables, not file walkers) reports as 'files' so the
+ same UI/limit shape works; the limits simply won't be consumed by any
+ walker today but are stored for forward-compat.
+ """
+ if model is FeatureDataSource:
+ return "files"
+ sourceType = str(rec.get("sourceType") or "").strip()
+ return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files"
+
+
+def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]:
+ """Coerce an incoming ragLimits dict to {allowedKey: positive int}.
+
+ Unknown keys are silently dropped; non-positive or non-numeric values
+ are rejected with 400.
+ """
+ if not isinstance(raw, dict):
+ raise HTTPException(status_code=400, detail="ragLimits must be an object")
+ allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set())
+ cleaned: Dict[str, int] = {}
+ for key, value in raw.items():
+ if key not in allowed:
+ continue
+ try:
+ intValue = int(value)
+ except (TypeError, ValueError):
+ raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer")
+ if intValue <= 0:
+ raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0")
+ cleaned[key] = intValue
+ return cleaned
+
+
+@router.patch("/{sourceId}/settings")
+@limiter.limit("30/minute")
+def _updateDataSourceSettings(
+ request: Request,
+ sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
+ settings: Dict[str, Any] = Body(..., embed=True),
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key).
+
+ Currently supports `ragLimits` only. Unknown top-level keys in the body are
+ rejected to avoid silently storing garbage that no consumer reads.
+
+ Owner-only for personal DataSources; mandate/feature scopes additionally
+ accept the mandate or workspace admins of that scope.
+ """
+ if not isinstance(settings, dict):
+ raise HTTPException(status_code=400, detail="settings must be an object")
+ unknown = set(settings.keys()) - {"ragLimits"}
+ if unknown:
+ raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}")
+
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ rootIf = getRootInterface()
+ rec, model = _findSourceRecord(rootIf.db, sourceId)
+ if not rec:
+ raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
+
+ ownerId = str(rec.get("userId") or "")
+ currentUserId = str(context.user.id)
+ if ownerId and ownerId != currentUserId and not context.isSysAdmin:
+ scope = str(rec.get("scope") or "personal")
+ isMandateAdmin = getattr(context, "isMandateAdmin", False)
+ if scope == "personal" or not isMandateAdmin:
+ raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings")
+
+ kind = _kindForSource(rec, model)
+
+ currentSettings = rec.get("settings") or {}
+ if not isinstance(currentSettings, dict):
+ currentSettings = {}
+ newSettings = dict(currentSettings)
+
+ if "ragLimits" in settings:
+ cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"])
+ mergedLimits = dict(currentSettings.get("ragLimits") or {})
+ mergedLimits.update(cleanedLimits)
+ newSettings["ragLimits"] = mergedLimits
+
+ rootIf.db.recordModify(model, sourceId, {"settings": newSettings})
+
+ import json
+ from modules.shared.auditLogger import audit_logger
+ from modules.datamodels.datamodelAudit import AuditCategory
+ audit_logger.logEvent(
+ userId=currentUserId,
+ mandateId=context.mandateId,
+ category=AuditCategory.PERMISSION.value,
+ action="datasource_settings_changed",
+ details=json.dumps({
+ "sourceId": sourceId,
+ "model": model.__name__,
+ "oldSettings": currentSettings,
+ "newSettings": newSettings,
+ }),
+ )
+ logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId)
+ return {"sourceId": sourceId, "settings": newSettings, "updated": True}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error updating datasource settings: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/{sourceId}/cost-estimate")
+@limiter.limit("60/minute")
+def _getDataSourceCostEstimate(
+ request: Request,
+ sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
+ context: RequestContext = Depends(getRequestContext),
+) -> Dict[str, Any]:
+ """Return an indicative full-sync cost estimate for the given DataSource.
+
+ Uses the current effective ragLimits (DataSource.settings.ragLimits with
+ fallback to centralized defaults) as the basis. Returns the same
+ `{estimatedTokens, estimatedUsd, basis}` shape regardless of source kind.
+ """
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ from modules.serviceCenter.services.serviceKnowledge import _ragLimits, _costEstimate
+ rootIf = getRootInterface()
+ rec, model = _findSourceRecord(rootIf.db, sourceId)
+ if not rec:
+ raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
+
+ kind = _kindForSource(rec, model)
+ effective = _ragLimits.getRagLimits(rec, kind)
+ estimate = _costEstimate.estimateBootstrapCost(effective, kind=kind)
+ estimate["sourceId"] = sourceId
+ return estimate
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Error computing cost estimate: %s", e, exc_info=True)
+ raise HTTPException(status_code=500, detail=str(e))
diff --git a/modules/routes/routeJobs.py b/modules/routes/routeJobs.py
index d2124a0b..9cd89d46 100644
--- a/modules/routes/routeJobs.py
+++ b/modules/routes/routeJobs.py
@@ -21,7 +21,7 @@ from modules.serviceCenter.services.serviceBackgroundJobs import (
getJobStatus,
listJobs,
)
-from modules.shared.i18nRegistry import apiRouteContext
+from modules.shared.i18nRegistry import apiRouteContext, resolveJobMessage
logger = logging.getLogger(__name__)
routeApiMsg = apiRouteContext("routeJobs")
@@ -34,8 +34,20 @@ router = APIRouter(
def _serialiseJob(job: Dict[str, Any]) -> Dict[str, Any]:
- """Strip system audit fields and ensure JSON-safe types."""
- return {k: v for k, v in job.items() if not k.startswith("sys")}
+ """Strip system audit fields, ensure JSON-safe types, translate progress.
+
+ Walkers store progress as a structured payload (``progressMessageData =
+ {key, params}``). The frontend never calls ``t()`` on backend-supplied
+ keys (i18n convention #2), so we resolve the payload here using the
+ request-context language and overwrite ``progressMessage`` with the
+ fully rendered string. Older clients keep working because they read
+ the same field.
+ """
+ out = {k: v for k, v in job.items() if not k.startswith("sys")}
+ translated = resolveJobMessage(out.get("progressMessageData"))
+ if translated:
+ out["progressMessage"] = translated
+ return out
def _userHasMandateAccess(context: RequestContext, mandateId: Optional[str]) -> bool:
diff --git a/modules/routes/routeRagInventory.py b/modules/routes/routeRagInventory.py
index 7c426d77..99d5c4df 100644
--- a/modules/routes/routeRagInventory.py
+++ b/modules/routes/routeRagInventory.py
@@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Depends, Request
from modules.auth import limiter, getCurrentUser, getRequestContext, RequestContext
from modules.datamodels.datamodelUam import User
-from modules.shared.i18nRegistry import apiRouteContext
+from modules.shared.i18nRegistry import apiRouteContext, resolveJobMessage
routeApiMsg = apiRouteContext("routeRagInventory")
logger = logging.getLogger(__name__)
@@ -24,6 +24,53 @@ router = APIRouter(
)
+_SUB_RESULT_KEYS = ("sharepoint", "outlook", "drive", "gmail", "clickup", "kdrive")
+
+
+def _flattenJobResult(result: Dict[str, Any]) -> Dict[str, Any]:
+ """Bootstrap handlers nest per-service results (e.g. msft returns
+ `{"sharepoint": {...}, "outlook": {...}}`). The UI needs per-connection
+ aggregates AND the first hit limit, so we sum the counters and pick the
+ most informative `stoppedAtLimit` across sub-services.
+
+ Returns a flat dict with the same keys the UI expects on `lastSuccess`.
+ """
+ subResults = [result[k] for k in _SUB_RESULT_KEYS if isinstance(result.get(k), dict)]
+ if not subResults:
+ # Single-service handler that returns flat dict directly (legacy path).
+ return result
+
+ indexed = sum(int(r.get("indexed") or 0) for r in subResults)
+ skippedDup = sum(int(r.get("skippedDuplicate") or 0) for r in subResults)
+ skippedPol = sum(int(r.get("skippedPolicy") or 0) for r in subResults)
+ failed = sum(int(r.get("failed") or 0) for r in subResults)
+ bytes_ = sum(int(r.get("bytesProcessed") or 0) for r in subResults)
+ # Parallel sub-services: wall-clock ≈ slowest one.
+ durationMs = max((int(r.get("durationMs") or 0) for r in subResults), default=0)
+
+ # First sub-service that hit a limit wins — UI shows one banner per
+ # connection; if multiple stopped, the first one is informative enough
+ # and the user re-runs after raising that budget.
+ stoppedAtLimit: Optional[str] = None
+ limits: Dict[str, Any] = {}
+ for r in subResults:
+ if r.get("stoppedAtLimit"):
+ stoppedAtLimit = r["stoppedAtLimit"]
+ limits = r.get("limits") or {}
+ break
+
+ return {
+ "indexed": indexed,
+ "skippedDuplicate": skippedDup,
+ "skippedPolicy": skippedPol,
+ "failed": failed,
+ "bytesProcessed": bytes_,
+ "durationMs": durationMs,
+ "stoppedAtLimit": stoppedAtLimit,
+ "limits": limits,
+ }
+
+
def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> List[Dict[str, Any]]:
"""Build per-connection RAG inventory rows.
@@ -111,7 +158,17 @@ def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> L
jobs = jobService.listJobs(jobType="connection.bootstrap", limit=50)
connJobs = [j for j in jobs if (j.get("payload") or {}).get("connectionId") == connectionId]
runningJobs = [
- {"jobId": j["id"], "progress": j.get("progress", 0), "progressMessage": j.get("progressMessage", "")}
+ {
+ "jobId": j["id"],
+ "progress": j.get("progress", 0),
+ # Server-side translate the structured walker payload into
+ # the request-context language; frontend renders 1:1 (no
+ # `t()` on backend-supplied keys).
+ "progressMessage": (
+ resolveJobMessage(j.get("progressMessageData"))
+ or j.get("progressMessage", "")
+ ),
+ }
for j in connJobs
if j.get("status") in ("PENDING", "RUNNING")
]
@@ -126,7 +183,12 @@ def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> L
"finishedAt": j.get("finishedAt"),
}
elif status == "SUCCESS" and lastSuccess is None:
- result = j.get("result") or {}
+ # Bootstrap handlers may return either a flat dict (single
+ # service) or a nested dict keyed by sub-service (e.g. msft
+ # returns {"sharepoint": {...}, "outlook": {...}}). Flatten
+ # so the UI always sees aggregated counters and the first
+ # sub-service that hit a limit.
+ result = _flattenJobResult(j.get("result") or {})
lastSuccess = {
"jobId": j["id"],
"finishedAt": j.get("finishedAt"),
@@ -337,7 +399,10 @@ def _getActiveJobs(
"connectionLabel": getattr(conn, "displayLabel", None) or getattr(conn, "authority", connId),
"jobType": j.get("jobType", "connection.bootstrap"),
"progress": j.get("progress", 0),
- "progressMessage": j.get("progressMessage", ""),
+ "progressMessage": (
+ resolveJobMessage(j.get("progressMessageData"))
+ or j.get("progressMessage", "")
+ ),
})
return active
except Exception as e:
diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
index e27dae58..90b69bce 100644
--- a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
+++ b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py
@@ -54,19 +54,53 @@ _CANCEL_CHECK_INTERVAL_S = 3.0
class JobProgressCallback:
- """Callable progress reporter with cooperative cancel-check for long-running walkers."""
+ """Callable progress reporter with cooperative cancel-check for long-running walkers.
+
+ Two ways to set a progress message:
+ progressCb(50, "145 Dateien verarbeitet") # legacy plaintext (DE)
+ progressCb(50, messageKey="{n} Dateien verarbeitet",
+ messageParams={"n": 145}) # i18n-friendly
+
+ When `messageKey` is given the structured payload is written to
+ `BackgroundJob.progressMessageData` so the frontend can render it via
+ `t(key, params)` in the user's UI language. A best-effort rendered
+ fallback is also stored in `progressMessage` for older clients, logs,
+ and audit trails.
+ """
def __init__(self, jobId: str):
self._jobId = jobId
self._cancelledCache: Optional[bool] = None
self._lastCheckedAt: float = 0.0
- def __call__(self, progress: int, message: Optional[str] = None) -> None:
+ def __call__(
+ self,
+ progress: int,
+ message: Optional[str] = None,
+ *,
+ messageKey: Optional[str] = None,
+ messageParams: Optional[Dict[str, Any]] = None,
+ ) -> None:
try:
clamped = max(0, min(100, int(progress)))
fields: Dict[str, Any] = {"progress": clamped}
- if message is not None:
+
+ if messageKey is not None:
+ params = messageParams or {}
+ try:
+ fallback = messageKey.format(**params)
+ except (KeyError, IndexError, ValueError) as fmtErr:
+ fallback = message or messageKey
+ logger.warning(
+ "progressCb message format failed for job %s key=%r params=%r: %s",
+ self._jobId, messageKey, params, fmtErr,
+ )
+ fields["progressMessageData"] = {"key": messageKey, "params": params}
+ fields["progressMessage"] = (message or fallback)[:500]
+ elif message is not None:
fields["progressMessage"] = message[:500]
+ fields["progressMessageData"] = None
+
_updateJob(self._jobId, fields)
except Exception as ex:
logger.warning("Progress update failed for job %s: %s", self._jobId, ex)
diff --git a/modules/serviceCenter/services/serviceChat/mainServiceChat.py b/modules/serviceCenter/services/serviceChat/mainServiceChat.py
index 2ca61d7e..61026de0 100644
--- a/modules/serviceCenter/services/serviceChat/mainServiceChat.py
+++ b/modules/serviceCenter/services/serviceChat/mainServiceChat.py
@@ -534,11 +534,17 @@ class ChatService:
) -> Dict[str, Any]:
"""Create a new external data source reference.
- Returns existing record if connectionId + path already exists (upsert semantics).
+ Upsert key is `(connectionId, sourceType, path)`. The same `path='/'`
+ can carry multiple DataSources discriminated by sourceType: the
+ Connection-Root (sourceType=, e.g. 'msft') plus one per
+ service (sourceType='sharepointFolder', 'outlookFolder', ...). The
+ sourceType filter MUST be present, otherwise a Service-Root POST
+ returns the Connection-Root and toggles cascade onto every sibling.
"""
from modules.datamodels.datamodelDataSource import DataSource
existing = self.interfaceDbApp.db.getRecordset(
- DataSource, recordFilter={"connectionId": connectionId, "path": path}
+ DataSource,
+ recordFilter={"connectionId": connectionId, "sourceType": sourceType, "path": path},
)
if existing:
return existing[0] if isinstance(existing[0], dict) else existing[0].model_dump()
diff --git a/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py b/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py
new file mode 100644
index 00000000..565c219d
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py
@@ -0,0 +1,86 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Indicative cost estimation for a RAG bootstrap run.
+
+This is **not** a billing-grade forecast: it gives the user a back-of-the-envelope
+USD figure for the worst-case full sync, so they can sanity-check before raising
+`maxBytes`/`maxItems`. The output always carries the underlying assumptions
+(`basis`) so the user can judge plausibility.
+
+Heuristic:
+ estimatedTokens = ceil(maxBytes / CHARS_PER_TOKEN_BYTES_FACTOR)
+ estimatedUsd = estimatedTokens / 1_000_000 * EMBEDDING_USD_PER_MTOKEN
+
+Defaults match OpenAI `text-embedding-3-small` pricing (2026-Q2).
+"""
+
+from __future__ import annotations
+
+import math
+from typing import Any, Dict
+
+
+CHARS_PER_TOKEN = 4
+EMBEDDING_USD_PER_MTOKEN = 0.02
+DEFAULT_TOKENS_PER_ITEM = 1500
+BYTES_PER_TOKEN_TEXT_FACTOR = 4
+EXTRACTABLE_FRACTION = 0.4
+
+
+def estimateBootstrapCost(limits: Dict[str, int], kind: str = "files") -> Dict[str, Any]:
+ """Return an indicative cost estimate dict for a DataSource bootstrap.
+
+ Returned shape::
+
+ {
+ "estimatedTokens": int,
+ "estimatedUsd": float, # rounded to 4 decimals
+ "basis": {
+ "kind": "files"|"clickup",
+ "limits": {...},
+ "assumptions": {
+ "embeddingUsdPerMToken": 0.02,
+ "charsPerToken": 4,
+ "extractableFraction": 0.4,
+ "tokensPerItem": 1500 # only for clickup-like item counts
+ },
+ "notes": "non-binding, depends on real file content..."
+ }
+ }
+ """
+ assumptions: Dict[str, Any] = {
+ "embeddingUsdPerMToken": EMBEDDING_USD_PER_MTOKEN,
+ "charsPerToken": CHARS_PER_TOKEN,
+ }
+
+ if kind == "files":
+ maxBytes = int(limits.get("maxBytes") or 0)
+ extractableBytes = maxBytes * EXTRACTABLE_FRACTION
+ estimatedTokens = int(math.ceil(extractableBytes / BYTES_PER_TOKEN_TEXT_FACTOR))
+ assumptions["extractableFraction"] = EXTRACTABLE_FRACTION
+ assumptions["formula"] = "ceil(maxBytes * 0.4 / 4)"
+ elif kind == "clickup":
+ maxTasks = int(limits.get("maxTasks") or 0)
+ maxWorkspaces = max(1, int(limits.get("maxWorkspaces") or 1))
+ estimatedTokens = maxTasks * maxWorkspaces * DEFAULT_TOKENS_PER_ITEM
+ assumptions["tokensPerItem"] = DEFAULT_TOKENS_PER_ITEM
+ assumptions["formula"] = "maxTasks * maxWorkspaces * 1500"
+ else:
+ estimatedTokens = 0
+ assumptions["formula"] = "unknown kind, returning zero"
+
+ estimatedUsd = round(estimatedTokens / 1_000_000 * EMBEDDING_USD_PER_MTOKEN, 4)
+
+ return {
+ "estimatedTokens": estimatedTokens,
+ "estimatedUsd": estimatedUsd,
+ "basis": {
+ "kind": kind,
+ "limits": dict(limits),
+ "assumptions": assumptions,
+ "notes": (
+ "Indicative only. Actual cost depends on file types, extractable text "
+ "ratio, dedup hit-rate, retries, and current embedding model pricing."
+ ),
+ },
+ }
diff --git a/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py b/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py
new file mode 100644
index 00000000..00180c9f
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py
@@ -0,0 +1,342 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Cascade-inherit semantics for DataSource flags (neutralize, ragIndexEnabled, scope).
+
+Three-state flags allow tree elements to either set an explicit value or
+inherit the value from their nearest ancestor in the path hierarchy. The
+walker (RAG/Neutralize) and routes resolve the *effective* value; the cascade
+helper resets explicit descendant values when a parent is toggled.
+
+Path-traversal rules:
+- A DataSource is identified by `(connectionId, sourceType, path)`.
+- The root of a service tree is `path == '/'`.
+- Sub-elements have paths like `/folder1/sub`. Their parent path is the
+ longest prefix path that exists as a DataSource record (string-based).
+- If no ancestor with an explicit value exists, the default is `False`
+ (or `'personal'` for scope) — matching the legacy behavior of NULL = inherit.
+"""
+
+import logging
+from typing import Any, Dict, Iterable, List, Optional, Tuple
+
+logger = logging.getLogger(__name__)
+
+_INHERITABLE_FLAGS = ("neutralize", "ragIndexEnabled", "scope")
+
+# Connection-root DataSources carry the authority as their sourceType
+# (e.g. 'msft', 'google'). They sit one level above all service DataSources
+# of the same connection in the visual tree, so flag inheritance must
+# cross sourceType boundaries — but ONLY from these authority roots.
+_AUTHORITY_SOURCE_TYPES = frozenset({"local", "google", "msft", "clickup", "infomaniak"})
+
+
+def _normalisePath(path: Optional[str]) -> str:
+ """Normalize a DataSource path to '/'-prefixed, no trailing slash (except root)."""
+ if not path:
+ return "/"
+ p = str(path).strip()
+ if not p.startswith("/"):
+ p = "/" + p
+ if len(p) > 1 and p.endswith("/"):
+ p = p.rstrip("/")
+ return p
+
+
+def _flagDefault(flag: str) -> Any:
+ if flag == "scope":
+ return "personal"
+ return False
+
+
+def _isExplicit(value: Any) -> bool:
+ """A flag value is explicit when it is not None.
+
+ Note: legacy rows may carry empty-string scope; treat as inherit too.
+ """
+ if value is None:
+ return False
+ if isinstance(value, str) and value == "":
+ return False
+ return True
+
+
+def _getRecordValue(rec: Any, key: str) -> Any:
+ if isinstance(rec, dict):
+ return rec.get(key)
+ return getattr(rec, key, None)
+
+
+def _findAncestorChain(
+ rec: Dict[str, Any],
+ allDs: Iterable[Dict[str, Any]],
+) -> List[Dict[str, Any]]:
+ """Return all ancestor DataSources of `rec` in the same connection,
+ ordered nearest-first.
+
+ Two ancestor relations are merged:
+ 1) **same-sourceType path-ancestor** — strict path-prefix within the
+ same service tree (sharepointFolder, gmailFolder, ...).
+ 2) **connection-root ancestor** — a DS with `path='/'` and
+ `sourceType` ∈ authority set (msft, google, ...) is the parent of
+ every other DS in that connection regardless of sourceType, so a
+ toggle on the connection node propagates to all services beneath.
+
+ The connection-root is always the most distant ancestor and therefore
+ sorts after any same-sourceType ancestors.
+ """
+ recPath = _normalisePath(_getRecordValue(rec, "path"))
+ recSourceType = _getRecordValue(rec, "sourceType")
+ recConnectionId = _getRecordValue(rec, "connectionId")
+ sameTypeCandidates: List[Tuple[int, Dict[str, Any]]] = []
+ connectionRoot: Optional[Dict[str, Any]] = None
+ recIsConnectionRoot = recSourceType in _AUTHORITY_SOURCE_TYPES and recPath == "/"
+ for cand in allDs:
+ if _getRecordValue(cand, "id") == _getRecordValue(rec, "id"):
+ continue
+ if _getRecordValue(cand, "connectionId") != recConnectionId:
+ continue
+ candSourceType = _getRecordValue(cand, "sourceType")
+ candPath = _normalisePath(_getRecordValue(cand, "path"))
+ if candSourceType == recSourceType:
+ if candPath == recPath or not _isAncestorPath(candPath, recPath):
+ continue
+ sameTypeCandidates.append((len(candPath), cand))
+ elif (
+ not recIsConnectionRoot
+ and candSourceType in _AUTHORITY_SOURCE_TYPES
+ and candPath == "/"
+ ):
+ connectionRoot = cand
+ sameTypeCandidates.sort(key=lambda x: x[0], reverse=True)
+ chain = [c for _, c in sameTypeCandidates]
+ if connectionRoot is not None:
+ chain.append(connectionRoot)
+ return chain
+
+
+def _isAncestorPath(ancestor: str, descendant: str) -> bool:
+ """True iff `ancestor` is a strict path-prefix of `descendant`.
+
+ '/' is ancestor of every non-root path. For non-root prefixes, the
+ descendant must continue with '/' so '/foo' isn't treated as ancestor of
+ '/foobar'.
+ """
+ if ancestor == descendant:
+ return False
+ if ancestor == "/":
+ return descendant != "/"
+ return descendant.startswith(ancestor + "/")
+
+
+def getEffectiveFlag(
+ rec: Dict[str, Any],
+ flag: str,
+ sameConnectionDs: Iterable[Dict[str, Any]],
+) -> Any:
+ """Resolve the effective value of a flag via path-traversal.
+
+ Order: own value (if explicit) → nearest ancestor with explicit value →
+ static default (`False` or `'personal'`).
+ """
+ if flag not in _INHERITABLE_FLAGS:
+ raise ValueError(f"Unknown inheritable flag: {flag}")
+ own = _getRecordValue(rec, flag)
+ if _isExplicit(own):
+ return own
+ chain = _findAncestorChain(rec, sameConnectionDs)
+ for ancestor in chain:
+ ancestorVal = _getRecordValue(ancestor, flag)
+ if _isExplicit(ancestorVal):
+ return ancestorVal
+ return _flagDefault(flag)
+
+
+def cascadeResetDescendants(
+ rootIf: Any,
+ parentRec: Dict[str, Any],
+ flag: str,
+) -> int:
+ """Reset all explicit descendant values of `flag` to NULL (= inherit).
+
+ Descendant relation mirrors `_findAncestorChain`:
+ - Connection-root (`path='/'` AND `sourceType` ∈ authorities) is parent
+ of every other DS in that connection (cross-sourceType cascade).
+ - Otherwise: same-sourceType strict path-descendants only.
+
+ Only the targeted `flag` is reset; other flags on the descendant are
+ untouched.
+
+ Returns the number of records updated.
+ """
+ if flag not in _INHERITABLE_FLAGS:
+ raise ValueError(f"Unknown inheritable flag: {flag}")
+ from modules.datamodels.datamodelDataSource import DataSource
+
+ connectionId = _getRecordValue(parentRec, "connectionId")
+ parentSourceType = _getRecordValue(parentRec, "sourceType")
+ parentPath = _normalisePath(_getRecordValue(parentRec, "path"))
+ parentId = _getRecordValue(parentRec, "id")
+ if not connectionId or not parentSourceType:
+ return 0
+
+ parentIsConnectionRoot = (
+ parentSourceType in _AUTHORITY_SOURCE_TYPES and parentPath == "/"
+ )
+
+ siblings = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
+ affected = 0
+ for sib in siblings:
+ sibId = _getRecordValue(sib, "id")
+ if sibId == parentId:
+ continue
+ sibSourceType = _getRecordValue(sib, "sourceType")
+ sibPath = _normalisePath(_getRecordValue(sib, "path"))
+ if parentIsConnectionRoot:
+ # Connection-root resets everything else under this connection.
+ pass
+ else:
+ if sibSourceType != parentSourceType:
+ continue
+ if not _isAncestorPath(parentPath, sibPath):
+ continue
+ sibVal = _getRecordValue(sib, flag)
+ if not _isExplicit(sibVal):
+ continue
+ try:
+ rootIf.db.recordModify(DataSource, sibId, {flag: None})
+ affected += 1
+ except Exception as exc:
+ logger.warning("Cascade-reset failed for DataSource %s flag=%s: %s", sibId, flag, exc)
+ if affected:
+ logger.info(
+ "Cascade-reset %s on %d descendants of DataSource (connectionId=%s, sourceType=%s, path=%s, connectionRoot=%s)",
+ flag, affected, connectionId, parentSourceType, parentPath, parentIsConnectionRoot,
+ )
+ return affected
+
+
+def _fdsClassify(fds: Dict[str, Any]) -> str:
+ """Return 'workspace' | 'table' | 'record' based on the FDS identifier shape."""
+ tableName = _getRecordValue(fds, "tableName") or ""
+ recordFilter = _getRecordValue(fds, "recordFilter")
+ if tableName == "*":
+ return "workspace"
+ if not recordFilter:
+ return "table"
+ return "record"
+
+
+def _fdsIsAncestor(parent: Dict[str, Any], child: Dict[str, Any]) -> bool:
+ """Return True iff `parent` FDS is a strict ancestor of `child` FDS.
+
+ Hierarchy within one `workspaceInstanceId`:
+ workspace-wildcard (tableName='*') → table-wildcard (tableName='X', !recordFilter)
+ → record-fds (tableName='X', recordFilter.id=...)
+ table-wildcard (tableName='X') → record-fds (tableName='X', recordFilter.id=...)
+ """
+ parentWsId = _getRecordValue(parent, "workspaceInstanceId")
+ childWsId = _getRecordValue(child, "workspaceInstanceId")
+ if not parentWsId or parentWsId != childWsId:
+ return False
+ if _getRecordValue(parent, "id") == _getRecordValue(child, "id"):
+ return False
+ parentKind = _fdsClassify(parent)
+ childKind = _fdsClassify(child)
+ if parentKind == "workspace":
+ return childKind in ("table", "record")
+ if parentKind == "table":
+ if childKind != "record":
+ return False
+ return _getRecordValue(parent, "tableName") == _getRecordValue(child, "tableName")
+ return False
+
+
+def getEffectiveFlagFds(
+ rec: Dict[str, Any],
+ flag: str,
+ sameWorkspaceFds: Iterable[Dict[str, Any]],
+) -> Any:
+ """Resolve effective value of a FeatureDataSource flag.
+
+ Order: own (if explicit) → table-wildcard (if explicit) →
+ workspace-wildcard (if explicit) → static default.
+ """
+ if flag not in ("neutralize", "scope"):
+ raise ValueError(f"Unknown inheritable FDS flag: {flag}")
+ own = _getRecordValue(rec, flag)
+ if _isExplicit(own):
+ return own
+ workspaceFds: List[Dict[str, Any]] = list(sameWorkspaceFds)
+ ancestors = [a for a in workspaceFds if _fdsIsAncestor(a, rec)]
+ ancestors.sort(key=lambda a: 0 if _fdsClassify(a) == "table" else 1)
+ for ancestor in ancestors:
+ val = _getRecordValue(ancestor, flag)
+ if _isExplicit(val):
+ return val
+ return _flagDefault(flag)
+
+
+def cascadeResetDescendantsFds(
+ rootIf: Any,
+ parentRec: Dict[str, Any],
+ flag: str,
+) -> int:
+ """Reset explicit `flag` to NULL on every descendant FDS of `parentRec`.
+
+ Only the targeted flag is reset; other flags on descendants are untouched.
+ Returns the number of records updated.
+ """
+ if flag not in ("neutralize", "scope"):
+ raise ValueError(f"Unknown inheritable FDS flag: {flag}")
+ from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
+
+ workspaceInstanceId = _getRecordValue(parentRec, "workspaceInstanceId")
+ if not workspaceInstanceId:
+ return 0
+ siblings = rootIf.db.getRecordset(
+ FeatureDataSource, recordFilter={"workspaceInstanceId": workspaceInstanceId}
+ )
+ affected = 0
+ for sib in siblings:
+ if not _fdsIsAncestor(parentRec, sib):
+ continue
+ sibVal = _getRecordValue(sib, flag)
+ if not _isExplicit(sibVal):
+ continue
+ sibId = _getRecordValue(sib, "id")
+ try:
+ rootIf.db.recordModify(FeatureDataSource, sibId, {flag: None})
+ affected += 1
+ except Exception as exc:
+ logger.warning("FDS cascade-reset failed for %s flag=%s: %s", sibId, flag, exc)
+ if affected:
+ logger.info(
+ "FDS cascade-reset %s on %d descendants of FDS (workspaceInstanceId=%s, kind=%s)",
+ flag, affected, workspaceInstanceId, _fdsClassify(parentRec),
+ )
+ return affected
+
+
+def buildEffectiveByConnection(
+ dataSources: Iterable[Dict[str, Any]],
+ flag: str,
+) -> Dict[str, Any]:
+ """Pre-compute the effective value of `flag` for every DataSource id.
+
+ Useful for batch operations (walker, route DTOs) that touch many records
+ at once. O(N²) in the worst case but N is bounded per connection.
+ """
+ if flag not in _INHERITABLE_FLAGS:
+ raise ValueError(f"Unknown inheritable flag: {flag}")
+ bySourceType: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
+ for ds in dataSources:
+ connId = _getRecordValue(ds, "connectionId") or ""
+ srcType = _getRecordValue(ds, "sourceType") or ""
+ bySourceType.setdefault((connId, srcType), []).append(ds)
+
+ out: Dict[str, Any] = {}
+ for group in bySourceType.values():
+ for rec in group:
+ recId = _getRecordValue(rec, "id")
+ out[recId] = getEffectiveFlag(rec, flag, group)
+ return out
diff --git a/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py b/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py
new file mode 100644
index 00000000..99d91d6b
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py
@@ -0,0 +1,23 @@
+"""Central i18n registration for BackgroundJob progress messages.
+
+Walkers and consumers report progress via ``progressCb(..., messageKey="…",
+messageParams={...})``. Those keys are not seen by ``t()`` at call time, so
+without a stub registration they would never make it into the boot-time
+``UiLanguageSet(xx)`` sync. Importing this module is enough to register
+every known key — call sites stay clean while translators can still find
+the texts in the standard i18n table.
+
+Keep this list in lockstep with the ``messageKey=`` arguments used in
+``subConnectorSync*.py`` and ``subConnectorIngestConsumer.py``.
+"""
+
+from modules.shared.i18nRegistry import t
+
+# Bootstrap walkers (one per connector family)
+t("{n} Dateien verarbeitet, {indexed} indexiert")
+t("{n} Tasks verarbeitet, {indexed} indexiert")
+t("{n} Mails verarbeitet, {indexed} indexiert")
+
+# Ingestion consumer hand-offs
+t("Verbindung wird aufgebaut ({authority})")
+t("Synchronisierung läuft...")
diff --git a/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py b/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py
new file mode 100644
index 00000000..de0a4886
--- /dev/null
+++ b/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Centralized RAG bootstrap limits + DataSource-scoped resolution.
+
+The original walkers (SharePoint, kDrive, gDrive, ClickUp) each carried their
+own module-level `MAX_*_DEFAULT` constants and silently stopped indexing once
+they were exceeded. That made it impossible for a user with a 500 MB folder to
+override the 200 MB cap without a code change.
+
+This module is the single source of truth for two things:
+
+1. The canonical default budget per source kind (`FILES_LIMITS_DEFAULT`,
+ `CLICKUP_LIMITS_DEFAULT`). Walkers fall back to these when a DataSource has
+ no `settings.ragLimits` yet.
+
+2. The pure read/lazy-fill helpers that walkers and the API use to merge a
+ DataSource's stored settings with the defaults. No override layers, no
+ resolver chain: what is in `DataSource.settings.ragLimits` is what the
+ walker uses.
+
+Lazy fill: the first time a DataSource is processed, the defaults are written
+to its `settings.ragLimits` so the UI shows real values immediately, even if
+the user has never opened the settings modal.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any, Dict, Optional
+
+
+logger = logging.getLogger(__name__)
+
+
+FILES_LIMITS_DEFAULT: Dict[str, int] = {
+ "maxItems": 500,
+ "maxBytes": 200 * 1024 * 1024,
+ "maxFileSize": 25 * 1024 * 1024,
+ "maxDepth": 4,
+}
+
+
+CLICKUP_LIMITS_DEFAULT: Dict[str, int] = {
+ "maxTasks": 500,
+ "maxWorkspaces": 3,
+ "maxListsPerWorkspace": 20,
+}
+
+
+_LIMITS_BY_KIND: Dict[str, Dict[str, int]] = {
+ "files": FILES_LIMITS_DEFAULT,
+ "clickup": CLICKUP_LIMITS_DEFAULT,
+}
+
+
+def getDefaults(kind: str) -> Dict[str, int]:
+ """Return a fresh copy of the default budget for the given walker kind.
+
+ `kind` is either "files" (Sharepoint, kDrive, gDrive) or "clickup".
+ Returning a copy lets callers mutate the result safely.
+ """
+ defaults = _LIMITS_BY_KIND.get(kind)
+ if defaults is None:
+ raise ValueError(f"Unknown RAG limit kind: {kind!r}")
+ return dict(defaults)
+
+
+def getStoredOverrides(dataSource: Optional[Dict[str, Any]], kind: str) -> Dict[str, int]:
+ """Return ONLY the limits explicitly set on `dataSource.settings.ragLimits`.
+
+ Missing keys are NOT filled with defaults — that is the caller's job (so
+ a programmatically supplied `limits=` from a Caller still wins when the
+ DataSource has no override). Pure read, no DB writes.
+ """
+ if not isinstance(dataSource, dict):
+ return {}
+ settings = dataSource.get("settings") or {}
+ if not isinstance(settings, dict):
+ return {}
+ stored = settings.get("ragLimits")
+ if not isinstance(stored, dict):
+ return {}
+ allowed = set(_LIMITS_BY_KIND.get(kind, {}).keys())
+ out: Dict[str, int] = {}
+ for key, raw in stored.items():
+ if key not in allowed or raw is None:
+ continue
+ try:
+ out[key] = int(raw)
+ except (TypeError, ValueError):
+ logger.warning(
+ "Ignoring non-int ragLimits[%s]=%r on DataSource %s",
+ key, raw, dataSource.get("id"),
+ )
+ return out
+
+
+def getRagLimits(dataSource: Optional[Dict[str, Any]], kind: str) -> Dict[str, int]:
+ """Effective RAG limits for the API/cost-estimate use-case.
+
+ Stored overrides win over `getDefaults(kind)`. Walkers should NOT use this
+ function — they should pass their own caller-limits as the fallback so that
+ a runtime-supplied `limits=` parameter is honoured (see `getStoredOverrides`).
+ """
+ base = getDefaults(kind)
+ base.update(getStoredOverrides(dataSource, kind))
+ return base
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
index c86aed86..618a9965 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py
@@ -141,18 +141,39 @@ _SOURCE_TYPE_MAP = {
def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] = None):
- """Load DataSource rows with ragIndexEnabled=true for a connection.
+ """Load DataSource rows whose *effective* ragIndexEnabled is True.
- If dataSourceIds is provided (mini-bootstrap), filter to only those IDs.
+ Cascade-inherit semantics: a DataSource with `ragIndexEnabled=None`
+ follows its nearest ancestor's value (path-traversal). Walker iterates
+ over all DataSources whose effective value resolves to True, including
+ inherited ones.
+
+ Returned dicts carry **resolved** flags (`neutralize`, `scope`) so the
+ downstream walkers can keep reading `ds.get("neutralize")` directly
+ without having to know about the inheritance chain.
+
+ If `dataSourceIds` is provided (mini-bootstrap), the explicit set is
+ intersected with the effective-true set.
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelDataSource import DataSource
+ from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag
rootIf = getRootInterface()
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
+ resolved = []
+ for ds in allDs:
+ effRagIndex = getEffectiveFlag(ds, "ragIndexEnabled", allDs)
+ if effRagIndex is not True:
+ continue
+ dsCopy = dict(ds) if isinstance(ds, dict) else {**ds.__dict__}
+ dsCopy["neutralize"] = getEffectiveFlag(ds, "neutralize", allDs)
+ dsCopy["scope"] = getEffectiveFlag(ds, "scope", allDs)
+ dsCopy["ragIndexEnabled"] = True
+ resolved.append(dsCopy)
if dataSourceIds:
- return [ds for ds in allDs if ds.get("id") in dataSourceIds and ds.get("ragIndexEnabled")]
- return [ds for ds in allDs if ds.get("ragIndexEnabled")]
+ resolved = [ds for ds in resolved if ds.get("id") in dataSourceIds]
+ return resolved
async def _bootstrapJobHandler(
@@ -167,7 +188,11 @@ async def _bootstrapJobHandler(
if not connectionId:
raise ValueError("connection.bootstrap requires payload.connectionId")
- progressCb(5, f"resolving {authority} connection")
+ progressCb(
+ 5,
+ messageKey="Verbindung wird aufgebaut ({authority})",
+ messageParams={"authority": authority},
+ )
# Defensive consent check
try:
@@ -225,7 +250,7 @@ async def _bootstrapJobHandler(
bootstrapOutlook,
)
- progressCb(0, "Synchronisierung läuft...")
+ progressCb(0, messageKey="Synchronisierung läuft...")
spDs = _filterDs("sharepoint")
olDs = _filterDs("outlook")
async def _noopResult():
@@ -251,7 +276,7 @@ async def _bootstrapJobHandler(
bootstrapGmail,
)
- progressCb(0, "Synchronisierung läuft...")
+ progressCb(0, messageKey="Synchronisierung läuft...")
gdDs = _filterDs("drive")
gmDs = _filterDs("gmail")
async def _noopResult():
@@ -274,7 +299,7 @@ async def _bootstrapJobHandler(
bootstrapClickup,
)
- progressCb(0, "Synchronisierung läuft...")
+ progressCb(0, messageKey="Synchronisierung läuft...")
cuDs = _filterDs("clickup")
cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb, dataSources=cuDs) if cuDs else {"skipped": True, "reason": "no_datasources"}
return {
@@ -288,7 +313,7 @@ async def _bootstrapJobHandler(
bootstrapKdrive,
)
- progressCb(0, "Synchronisierung läuft...")
+ progressCb(0, messageKey="Synchronisierung läuft...")
kdDs = _filterDs("kdrive")
kdResult = await bootstrapKdrive(connectionId=connectionId, progressCb=progressCb, dataSources=kdDs) if kdDs else {"skipped": True, "reason": "no_datasources"}
return {
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
index 959e42c9..28c24275 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py
@@ -33,13 +33,21 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import (
logger = logging.getLogger(__name__)
-MAX_TASKS_DEFAULT = 500
-MAX_WORKSPACES_DEFAULT = 3
-MAX_LISTS_PER_WORKSPACE_DEFAULT = 20
+from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper
+
+_CLICKUP_DEFAULTS = _ragLimitsHelper.CLICKUP_LIMITS_DEFAULT
+MAX_TASKS_DEFAULT = _CLICKUP_DEFAULTS["maxTasks"]
+MAX_WORKSPACES_DEFAULT = _CLICKUP_DEFAULTS["maxWorkspaces"]
+MAX_LISTS_PER_WORKSPACE_DEFAULT = _CLICKUP_DEFAULTS["maxListsPerWorkspace"]
MAX_DESCRIPTION_CHARS_DEFAULT = 8000
MAX_AGE_DAYS_DEFAULT = 180
+def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]:
+ """Return explicit RAG-limit overrides stored on the DataSource (or {})."""
+ return _ragLimitsHelper.getStoredOverrides(ds, "clickup")
+
+
@dataclass
class ClickupBootstrapLimits:
maxTasks: int = MAX_TASKS_DEFAULT
@@ -236,10 +244,11 @@ async def bootstrapClickup(
dsId = ds.get("id", "")
dsNeutralize = ds.get("neutralize", False)
+ eff = _resolveDataSourceLimits(dsId, ds)
dsLimits = ClickupBootstrapLimits(
- maxTasks=limits.maxTasks,
- maxWorkspaces=limits.maxWorkspaces,
- maxListsPerWorkspace=limits.maxListsPerWorkspace,
+ maxTasks=eff.get("maxTasks", limits.maxTasks),
+ maxWorkspaces=eff.get("maxWorkspaces", limits.maxWorkspaces),
+ maxListsPerWorkspace=eff.get("maxListsPerWorkspace", limits.maxListsPerWorkspace),
maxDescriptionChars=limits.maxDescriptionChars,
maxAgeDays=limits.maxAgeDays,
includeClosed=limits.includeClosed,
@@ -520,7 +529,11 @@ async def _ingestTask(
if hasattr(progressCb, "isCancelled") and progressCb.isCancelled():
return
try:
- progressCb(0, f"{processed} Tasks verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Tasks verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
if processed % 50 == 0:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
index e27abacb..7600cce0 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py
@@ -31,13 +31,21 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import (
logger = logging.getLogger(__name__)
-MAX_ITEMS_DEFAULT = 500
-MAX_BYTES_DEFAULT = 200 * 1024 * 1024
-MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024
+from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper
+
+_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT
+MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"]
+MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"]
+MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"]
+MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"]
SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/")
-MAX_DEPTH_DEFAULT = 4
MAX_AGE_DAYS_DEFAULT = 365
+
+def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]:
+ """Return explicit RAG-limit overrides stored on the DataSource (or {})."""
+ return _ragLimitsHelper.getStoredOverrides(ds, "files")
+
FOLDER_MIME = "application/vnd.google-apps.folder"
@@ -175,12 +183,13 @@ async def bootstrapGdrive(
dsId = ds.get("id", "")
dsNeutralize = ds.get("neutralize", False)
dsMaxAgeDays = ds.get("maxAgeDays", limits.maxAgeDays)
+ eff = _resolveDataSourceLimits(dsId, ds)
dsLimits = GdriveBootstrapLimits(
- maxItems=limits.maxItems,
- maxBytes=limits.maxBytes,
- maxFileSize=limits.maxFileSize,
+ maxItems=eff.get("maxItems", limits.maxItems),
+ maxBytes=eff.get("maxBytes", limits.maxBytes),
+ maxFileSize=eff.get("maxFileSize", limits.maxFileSize),
skipMimePrefixes=limits.skipMimePrefixes,
- maxDepth=limits.maxDepth,
+ maxDepth=eff.get("maxDepth", limits.maxDepth),
maxAgeDays=dsMaxAgeDays,
neutralize=dsNeutralize,
)
@@ -459,7 +468,11 @@ async def _ingestOne(
processed = result.indexed + result.skippedDuplicate
if progressCb is not None and processed % 5 == 0:
try:
- progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Dateien verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
logger.info(
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
index 3130e942..96f9cecf 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py
@@ -474,7 +474,11 @@ async def _ingestMessage(
processed = result.indexed + result.skippedDuplicate
if progressCb is not None and processed % 5 == 0:
try:
- progressCb(0, f"{processed} Mails verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Mails verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
if processed % 50 == 0:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py
index dcf19e39..f95aafd1 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py
@@ -27,11 +27,19 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import (
logger = logging.getLogger(__name__)
-MAX_ITEMS_DEFAULT = 500
-MAX_BYTES_DEFAULT = 200 * 1024 * 1024
-MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024
+from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper
+
+_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT
+MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"]
+MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"]
+MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"]
+MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"]
SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/")
-MAX_DEPTH_DEFAULT = 4
+
+
+def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]:
+ """Return explicit RAG-limit overrides stored on the DataSource (or {})."""
+ return _ragLimitsHelper.getStoredOverrides(ds, "files")
@dataclass
@@ -143,12 +151,13 @@ async def bootstrapKdrive(
dsPath = ds.get("path", "")
dsId = ds.get("id", "")
dsNeutralize = ds.get("neutralize", False)
+ eff = _resolveDataSourceLimits(dsId, ds)
dsLimits = KdriveBootstrapLimits(
- maxItems=limits.maxItems,
- maxBytes=limits.maxBytes,
- maxFileSize=limits.maxFileSize,
+ maxItems=eff.get("maxItems", limits.maxItems),
+ maxBytes=eff.get("maxBytes", limits.maxBytes),
+ maxFileSize=eff.get("maxFileSize", limits.maxFileSize),
skipMimePrefixes=limits.skipMimePrefixes,
- maxDepth=limits.maxDepth,
+ maxDepth=eff.get("maxDepth", limits.maxDepth),
neutralize=dsNeutralize,
)
@@ -416,7 +425,11 @@ async def _ingestOne(
processed = result.indexed + result.skippedDuplicate
if progressCb is not None and processed % 5 == 0:
try:
- progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Dateien verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
index 17220d97..e676b156 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py
@@ -460,7 +460,11 @@ async def _ingestMessage(
processed = result.indexed + result.skippedDuplicate
if progressCb is not None and processed % 5 == 0:
try:
- progressCb(0, f"{processed} Mails verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Mails verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
if processed % 50 == 0:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
index e06fd36b..87c4c92a 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py
@@ -30,14 +30,27 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import (
logger = logging.getLogger(__name__)
-MAX_ITEMS_DEFAULT = 500
-MAX_BYTES_DEFAULT = 200 * 1024 * 1024
-MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024
+from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper
+
+_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT
+MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"]
+MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"]
+MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"]
+MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"]
SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/")
-MAX_DEPTH_DEFAULT = 4
MAX_SITES_DEFAULT = 3
+def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]:
+ """Return explicit RAG-limit overrides stored on the DataSource.
+
+ Empty dict means "use caller-supplied limits" — never overrides them with
+ defaults. Used to merge per-DataSource user settings on top of the
+ walker's runtime limits.
+ """
+ return _ragLimitsHelper.getStoredOverrides(ds, "files")
+
+
@dataclass
class SharepointBootstrapLimits:
maxItems: int = MAX_ITEMS_DEFAULT
@@ -165,12 +178,13 @@ async def bootstrapSharepoint(
dsPath = ds.get("path", "")
dsId = ds.get("id", "")
dsNeutralize = ds.get("neutralize", False)
+ eff = _resolveDataSourceLimits(dsId, ds)
dsLimits = SharepointBootstrapLimits(
- maxItems=limits.maxItems,
- maxBytes=limits.maxBytes,
- maxFileSize=limits.maxFileSize,
+ maxItems=eff.get("maxItems", limits.maxItems),
+ maxBytes=eff.get("maxBytes", limits.maxBytes),
+ maxFileSize=eff.get("maxFileSize", limits.maxFileSize),
skipMimePrefixes=limits.skipMimePrefixes,
- maxDepth=limits.maxDepth,
+ maxDepth=eff.get("maxDepth", limits.maxDepth),
maxSites=limits.maxSites,
neutralize=dsNeutralize,
)
@@ -441,7 +455,11 @@ async def _ingestOne(
processed = result.indexed + result.skippedDuplicate
if progressCb is not None and processed % 5 == 0:
try:
- progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert")
+ progressCb(
+ 0,
+ messageKey="{n} Dateien verarbeitet, {indexed} indexiert",
+ messageParams={"n": processed, "indexed": result.indexed},
+ )
except Exception:
pass
if processed % 50 == 0:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
index 10be150d..0deae777 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py
@@ -1,78 +1,32 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
-"""Resolve effective policies (neutralize, ragIndexEnabled) for DataSource tree hierarchies.
+"""DEPRECATED: Use `_inheritFlags.getEffectiveFlag()` directly.
-Tree-inheritance rule: nearest ancestor DataSource with an explicit value wins.
-If no ancestor has a value, the default (False) is used.
+Thin shim to the new cascade-inherit helper. Kept so external callers don't
+break on import — internal walkers consume pre-resolved dicts via
+`_loadRagEnabledDataSources`.
"""
from __future__ import annotations
-import logging
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List
-logger = logging.getLogger(__name__)
+from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag
def resolveEffectiveNeutralize(
ds: Dict[str, Any],
allDataSources: List[Dict[str, Any]],
) -> bool:
- """Compute effective neutralize by walking up the path tree.
-
- A DataSource at /sites/HR/Documents inherits from /sites/HR if
- that ancestor has neutralize=True and the child has no explicit override.
- """
- ownValue = ds.get("neutralize")
- if ownValue is not None and ownValue is not False:
- return True
- if ownValue is False:
- return False
- return _findAncestorPolicy(ds, allDataSources, "neutralize")
+ """DEPRECATED: use `getEffectiveFlag(ds, 'neutralize', allDataSources)`."""
+ value = getEffectiveFlag(ds, "neutralize", allDataSources)
+ return bool(value)
def resolveEffectiveRagIndexEnabled(
ds: Dict[str, Any],
allDataSources: List[Dict[str, Any]],
) -> bool:
- """Compute effective ragIndexEnabled by walking up the path tree."""
- ownValue = ds.get("ragIndexEnabled")
- if ownValue is True:
- return True
- if ownValue is False:
- return False
- return _findAncestorPolicy(ds, allDataSources, "ragIndexEnabled")
-
-
-def _findAncestorPolicy(
- ds: Dict[str, Any],
- allDataSources: List[Dict[str, Any]],
- field: str,
-) -> bool:
- """Walk ancestors (longest-prefix match) to find an inherited policy value."""
- dsPath = ds.get("path", "")
- connectionId = ds.get("connectionId", "")
- if not dsPath:
- return False
-
- ancestors = []
- for candidate in allDataSources:
- if candidate.get("id") == ds.get("id"):
- continue
- if candidate.get("connectionId") != connectionId:
- continue
- candidatePath = candidate.get("path", "")
- if not candidatePath:
- continue
- if dsPath.startswith(candidatePath) and len(candidatePath) < len(dsPath):
- ancestors.append(candidate)
-
- ancestors.sort(key=lambda a: len(a.get("path", "")), reverse=True)
-
- for ancestor in ancestors:
- val = ancestor.get(field)
- if val is True:
- return True
- if val is False:
- return False
- return False
+ """DEPRECATED: use `getEffectiveFlag(ds, 'ragIndexEnabled', allDataSources)`."""
+ value = getEffectiveFlag(ds, "ragIndexEnabled", allDataSources)
+ return bool(value)
diff --git a/modules/shared/i18nRegistry.py b/modules/shared/i18nRegistry.py
index 7e620f8d..06ccb20e 100644
--- a/modules/shared/i18nRegistry.py
+++ b/modules/shared/i18nRegistry.py
@@ -124,6 +124,48 @@ def t(key: str, context: str = "api", value: str = "") -> str:
return _CACHE.get(lang, {}).get(key, f"[{key}]")
+def resolveJobMessage(messageData: Optional[Dict[str, Any]], lang: Optional[str] = None) -> Optional[str]:
+ """Translate a structured BackgroundJob progress payload.
+
+ ``messageData`` shape (written by ``JobProgressCallback`` when callers
+ pass ``messageKey`` / ``messageParams``)::
+
+ {"key": "{n} Dateien verarbeitet, {indexed} indexiert",
+ "params": {"n": 145, "indexed": 106}}
+
+ The walker call sites use a string-literal ``messageKey=``; the matching
+ ``t("…")`` literal lives in the feature's progress-key registration
+ module (e.g. ``serviceKnowledge/_progressMessages.py``,
+ ``features/trustee/mainTrustee.py``) so the boot sync picks it up.
+
+ This helper is the **server-side** translation hop so route handlers can
+ deliver a fully rendered ``progressMessage`` string to the frontend --
+ the frontend never calls ``t()`` on backend-supplied keys.
+ """
+ if not messageData or not isinstance(messageData, dict):
+ return None
+ key = messageData.get("key")
+ if not isinstance(key, str) or not key:
+ return None
+ params = messageData.get("params") or {}
+
+ if lang is not None:
+ token = _CURRENT_LANGUAGE.set(lang)
+ try:
+ template = t(key)
+ finally:
+ _CURRENT_LANGUAGE.reset(token)
+ else:
+ template = t(key)
+
+ if isinstance(params, dict) and params:
+ try:
+ return template.format(**params)
+ except (KeyError, IndexError, ValueError):
+ return template
+ return template
+
+
def resolveText(value: Any, lang: Optional[str] = None) -> str:
"""Resolve any value to a translated string for the current request language.
diff --git a/scripts/debug_rag_job_result.py b/scripts/debug_rag_job_result.py
new file mode 100644
index 00000000..c107f21e
--- /dev/null
+++ b/scripts/debug_rag_job_result.py
@@ -0,0 +1,70 @@
+"""Diagnose: read a connection.bootstrap job result and print its keys.
+
+Usage (from repo root):
+ python gateway\scripts\debug_rag_job_result.py
+
+Prints the most recent SUCCESS connection.bootstrap job per UserConnection so
+we can see whether the `stoppedAtLimit` key actually landed in the JSONB
+`result` column. If it is missing here, the bug is in the writer (handler or
+_markSuccess); if it is present here but absent in the HTTP response, the bug
+is in routeRagInventory.
+"""
+from __future__ import annotations
+
+import os
+import sys
+import json
+from pathlib import Path
+
+_HERE = Path(__file__).resolve()
+sys.path.insert(0, str(_HERE.parent.parent)) # gateway/
+os.chdir(_HERE.parent.parent)
+
+from modules.shared.configuration import APP_CONFIG # noqa: E402
+from modules.connectors.connectorDbPostgre import getCachedConnector # noqa: E402
+from modules.datamodels.datamodelBackgroundJob import BackgroundJob # noqa: E402
+from modules.routes.routeRagInventory import _flattenJobResult # noqa: E402
+
+
+def _main() -> None:
+ db = getCachedConnector(
+ dbDatabase=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+ rows = db.getRecordset(BackgroundJob)
+ rows = [r for r in rows if r.get("jobType") == "connection.bootstrap"]
+ rows = [r for r in rows if r.get("status") == "SUCCESS"]
+ rows.sort(key=lambda r: r.get("createdAt") or 0, reverse=True)
+
+ if not rows:
+ print("No SUCCESS connection.bootstrap jobs found.")
+ return
+
+ seenConnections: set[str] = set()
+ for j in rows:
+ connId = (j.get("payload") or {}).get("connectionId", "")
+ if connId in seenConnections:
+ continue
+ seenConnections.add(connId)
+ result = j.get("result") or {}
+ flat = _flattenJobResult(result) if isinstance(result, dict) else {}
+ print("=" * 80)
+ print(f"jobId = {j.get('id')}")
+ print(f"connectionId = {connId}")
+ print(f"finishedAt = {j.get('finishedAt')}")
+ print(f"raw keys = {sorted(result.keys()) if isinstance(result, dict) else 'N/A'}")
+ print("--- flattened (what the API will return now) ---")
+ print(f" indexed = {flat.get('indexed')}")
+ print(f" skippedDuplicate= {flat.get('skippedDuplicate')}")
+ print(f" skippedPolicy = {flat.get('skippedPolicy')}")
+ print(f" stoppedAtLimit = {flat.get('stoppedAtLimit')!r} <-- KEY CHECK")
+ print(f" limits = {flat.get('limits')}")
+ print(f" bytesProcessed = {flat.get('bytesProcessed')}")
+
+
+if __name__ == "__main__":
+ _main()
diff --git a/scripts/script_db_migrate_backgroundjob_progress_data.py b/scripts/script_db_migrate_backgroundjob_progress_data.py
new file mode 100644
index 00000000..bc5fc348
--- /dev/null
+++ b/scripts/script_db_migrate_backgroundjob_progress_data.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+"""Migration: Add `progressMessageData` JSONB column to BackgroundJob.
+
+Carries the structured i18n payload that lets the frontend translate
+walker progress messages (e.g. "{n} Dateien verarbeitet, {indexed}
+indexiert") into the user's UI language. `progressMessage` stays around
+as the rendered fallback for older clients and audit logs.
+
+Safe to run multiple times (checks column existence before acting).
+
+Usage:
+ python scripts/script_db_migrate_backgroundjob_progress_data.py [--dry-run]
+"""
+
+import os
+import sys
+import argparse
+import logging
+from pathlib import Path
+
+scriptPath = Path(__file__).resolve()
+gatewayPath = scriptPath.parent.parent
+sys.path.insert(0, str(gatewayPath))
+os.chdir(str(gatewayPath))
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
+logger = logging.getLogger(__name__)
+
+import psycopg2
+from modules.shared.configuration import APP_CONFIG
+
+
+def _getConnection():
+ return psycopg2.connect(
+ host=APP_CONFIG.get("DB_HOST", "localhost"),
+ port=int(APP_CONFIG.get("DB_PORT", "5432")),
+ database=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ user=APP_CONFIG.get("DB_USER"),
+ password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _columnExists(cur, table: str, column: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""",
+ (table, column),
+ )
+ return cur.fetchone() is not None
+
+
+def _tableExists(cur, table: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name = %s""",
+ (table,),
+ )
+ return cur.fetchone() is not None
+
+
+def migrate(dryRun: bool = False):
+ conn = _getConnection()
+ conn.autocommit = False
+ cur = conn.cursor()
+
+ table, column = "BackgroundJob", "progressMessageData"
+ executed = []
+
+ if not _tableExists(cur, table):
+ logger.warning("SKIP: table %s does not exist yet (will be created on next ORM init)", table)
+ elif _columnExists(cur, table, column):
+ logger.info("SKIP: %s.%s already exists", table, column)
+ else:
+ sql = f'ALTER TABLE public."{table}" ADD COLUMN "{column}" JSONB DEFAULT NULL;'
+ logger.info("EXEC: %s", sql)
+ if not dryRun:
+ cur.execute(sql)
+ executed.append(sql)
+
+ if not dryRun and executed:
+ conn.commit()
+ logger.info("Migration committed (%d statements)", len(executed))
+ elif dryRun and executed:
+ conn.rollback()
+ logger.info("DRY RUN -- would execute %d statements", len(executed))
+ else:
+ logger.info("Nothing to do -- schema already up to date")
+
+ cur.close()
+ conn.close()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing")
+ args = parser.parse_args()
+ migrate(dryRun=args.dry_run)
diff --git a/scripts/script_db_migrate_datasource_inherit.py b/scripts/script_db_migrate_datasource_inherit.py
new file mode 100644
index 00000000..3444cbee
--- /dev/null
+++ b/scripts/script_db_migrate_datasource_inherit.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+"""Migration: Drop NOT NULL on DataSource/FeatureDataSource cascade-inherit flags.
+
+Switches three-valued semantics (NULL = inherit, True/False = explicit) for:
+ - DataSource.neutralize, ragIndexEnabled, scope
+ - FeatureDataSource.neutralize, scope
+
+Existing rows keep their explicit values; only new records (or explicit reset
+via cascade) start with NULL. Migration is non-destructive and idempotent.
+
+Safe to run multiple times.
+
+Usage:
+ python scripts/script_db_migrate_datasource_inherit.py [--dry-run]
+"""
+
+import os
+import sys
+import argparse
+import logging
+from pathlib import Path
+
+scriptPath = Path(__file__).resolve()
+gatewayPath = scriptPath.parent.parent
+sys.path.insert(0, str(gatewayPath))
+os.chdir(str(gatewayPath))
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
+logger = logging.getLogger(__name__)
+
+import psycopg2
+from modules.shared.configuration import APP_CONFIG
+
+
+def _getConnection():
+ return psycopg2.connect(
+ host=APP_CONFIG.get("DB_HOST", "localhost"),
+ port=int(APP_CONFIG.get("DB_PORT", "5432")),
+ database=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ user=APP_CONFIG.get("DB_USER"),
+ password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _tableExists(cur, table: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name = %s""",
+ (table,),
+ )
+ return cur.fetchone() is not None
+
+
+def _columnIsNullable(cur, table: str, column: str) -> bool:
+ cur.execute(
+ """SELECT is_nullable FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""",
+ (table, column),
+ )
+ row = cur.fetchone()
+ if not row:
+ return False
+ return row[0] == "YES"
+
+
+def migrate(dryRun: bool = False):
+ conn = _getConnection()
+ conn.autocommit = False
+ cur = conn.cursor()
+
+ targets = [
+ ("DataSource", "neutralize"),
+ ("DataSource", "ragIndexEnabled"),
+ ("DataSource", "scope"),
+ ("FeatureDataSource", "neutralize"),
+ ("FeatureDataSource", "scope"),
+ ]
+
+ executed = []
+ for table, column in targets:
+ if not _tableExists(cur, table):
+ logger.warning("SKIP: table %s does not exist yet", table)
+ continue
+ if _columnIsNullable(cur, table, column):
+ logger.info("SKIP: %s.%s already nullable", table, column)
+ continue
+ sql = f'ALTER TABLE public."{table}" ALTER COLUMN "{column}" DROP NOT NULL;'
+ logger.info("EXEC: %s", sql)
+ if not dryRun:
+ cur.execute(sql)
+ executed.append(sql)
+
+ if not dryRun and executed:
+ conn.commit()
+ logger.info("Migration committed (%d statements)", len(executed))
+ elif dryRun and executed:
+ conn.rollback()
+ logger.info("DRY RUN -- would execute %d statements", len(executed))
+ else:
+ logger.info("Nothing to do -- schema already nullable")
+
+ cur.close()
+ conn.close()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing")
+ args = parser.parse_args()
+ migrate(dryRun=args.dry_run)
diff --git a/scripts/script_db_migrate_datasource_settings.py b/scripts/script_db_migrate_datasource_settings.py
new file mode 100644
index 00000000..9e821221
--- /dev/null
+++ b/scripts/script_db_migrate_datasource_settings.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+"""Migration: Add `settings` JSONB column to DataSource and FeatureDataSource.
+
+This is a one-off migration for the UDB DataSource Settings (Settings-Icon)
+feature: walkers read RAG limits (maxBytes, maxFileSize, maxItems, maxDepth)
+from this JSON blob, the UI edits them. Existing rows get NULL until the
+next bootstrap lazy-fills sensible defaults from `_ragLimits.RAG_LIMITS_DEFAULT`.
+
+Safe to run multiple times (checks column existence before acting).
+
+Usage:
+ python scripts/script_db_migrate_datasource_settings.py [--dry-run]
+"""
+
+import os
+import sys
+import argparse
+import logging
+from pathlib import Path
+
+scriptPath = Path(__file__).resolve()
+gatewayPath = scriptPath.parent.parent
+sys.path.insert(0, str(gatewayPath))
+os.chdir(str(gatewayPath))
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
+logger = logging.getLogger(__name__)
+
+import psycopg2
+from modules.shared.configuration import APP_CONFIG
+
+
+def _getConnection():
+ return psycopg2.connect(
+ host=APP_CONFIG.get("DB_HOST", "localhost"),
+ port=int(APP_CONFIG.get("DB_PORT", "5432")),
+ database=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ user=APP_CONFIG.get("DB_USER"),
+ password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _columnExists(cur, table: str, column: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.columns
+ WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""",
+ (table, column),
+ )
+ return cur.fetchone() is not None
+
+
+def _tableExists(cur, table: str) -> bool:
+ cur.execute(
+ """SELECT 1 FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name = %s""",
+ (table,),
+ )
+ return cur.fetchone() is not None
+
+
+def migrate(dryRun: bool = False):
+ conn = _getConnection()
+ conn.autocommit = False
+ cur = conn.cursor()
+
+ targets = [
+ ("DataSource", "settings"),
+ ("FeatureDataSource", "settings"),
+ ]
+
+ executed = []
+ for table, column in targets:
+ if not _tableExists(cur, table):
+ logger.warning("SKIP: table %s does not exist yet (will be created on next ORM init)", table)
+ continue
+ if _columnExists(cur, table, column):
+ logger.info("SKIP: %s.%s already exists", table, column)
+ continue
+ sql = f'ALTER TABLE public."{table}" ADD COLUMN "{column}" JSONB DEFAULT NULL;'
+ logger.info("EXEC: %s", sql)
+ if not dryRun:
+ cur.execute(sql)
+ executed.append(sql)
+
+ if not dryRun and executed:
+ conn.commit()
+ logger.info("Migration committed (%d statements)", len(executed))
+ elif dryRun and executed:
+ conn.rollback()
+ logger.info("DRY RUN -- would execute %d statements", len(executed))
+ else:
+ logger.info("Nothing to do -- schema already up to date")
+
+ cur.close()
+ conn.close()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing")
+ args = parser.parse_args()
+ migrate(dryRun=args.dry_run)
diff --git a/tests/unit/services/test_costEstimate.py b/tests/unit/services/test_costEstimate.py
new file mode 100644
index 00000000..e49aca6a
--- /dev/null
+++ b/tests/unit/services/test_costEstimate.py
@@ -0,0 +1,55 @@
+"""Unit tests for `_costEstimate` heuristic.
+
+Validates the output shape, basic formulas, and that 'basis' annotations
+are always present (the user-facing transparency contract).
+"""
+
+from __future__ import annotations
+
+import unittest
+
+from modules.serviceCenter.services.serviceKnowledge import _costEstimate
+
+
+class TestCostEstimate(unittest.TestCase):
+ def test_files_shape(self):
+ result = _costEstimate.estimateBootstrapCost(
+ {"maxBytes": 200 * 1024 * 1024}, kind="files",
+ )
+ self.assertIn("estimatedTokens", result)
+ self.assertIn("estimatedUsd", result)
+ self.assertIn("basis", result)
+ self.assertIn("assumptions", result["basis"])
+ self.assertIn("formula", result["basis"]["assumptions"])
+ self.assertIn("notes", result["basis"])
+
+ def test_files_doubling_maxBytes_doubles_tokens(self):
+ low = _costEstimate.estimateBootstrapCost({"maxBytes": 100 * 1024 * 1024}, kind="files")
+ high = _costEstimate.estimateBootstrapCost({"maxBytes": 200 * 1024 * 1024}, kind="files")
+ self.assertEqual(high["estimatedTokens"], low["estimatedTokens"] * 2)
+
+ def test_clickup_uses_tasks_and_workspaces(self):
+ result = _costEstimate.estimateBootstrapCost(
+ {"maxTasks": 100, "maxWorkspaces": 2, "maxListsPerWorkspace": 10},
+ kind="clickup",
+ )
+ expectedTokens = 100 * 2 * _costEstimate.DEFAULT_TOKENS_PER_ITEM
+ self.assertEqual(result["estimatedTokens"], expectedTokens)
+
+ def test_unknown_kind_returns_zero(self):
+ result = _costEstimate.estimateBootstrapCost({}, kind="totally-unknown")
+ self.assertEqual(result["estimatedTokens"], 0)
+ self.assertEqual(result["estimatedUsd"], 0.0)
+
+ def test_usd_is_rounded_4_decimals(self):
+ result = _costEstimate.estimateBootstrapCost({"maxBytes": 1024 * 1024}, kind="files")
+ rounded = round(result["estimatedUsd"], 4)
+ self.assertEqual(result["estimatedUsd"], rounded)
+
+ def test_basis_includes_input_limits(self):
+ result = _costEstimate.estimateBootstrapCost({"maxBytes": 42}, kind="files")
+ self.assertEqual(result["basis"]["limits"]["maxBytes"], 42)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/services/test_inheritFlags.py b/tests/unit/services/test_inheritFlags.py
new file mode 100644
index 00000000..b177e767
--- /dev/null
+++ b/tests/unit/services/test_inheritFlags.py
@@ -0,0 +1,330 @@
+"""Unit tests for `_inheritFlags` cascade-inherit helpers.
+
+Verifies:
+- getEffectiveFlag walks ancestors via path-prefix matching
+- root default is False (or 'personal' for scope) when nothing explicit in chain
+- only same-connectionId AND same-sourceType ancestors are considered
+- cascadeResetDescendants only touches descendants with explicit values for THAT flag
+- '/' is treated as ancestor of every non-root path
+- '/foo' is NOT ancestor of '/foobar' (must require '/' separator)
+"""
+
+from __future__ import annotations
+
+import unittest
+from typing import List
+from unittest.mock import MagicMock
+
+from modules.serviceCenter.services.serviceKnowledge import _inheritFlags
+
+
+def _ds(idVal: str, path: str, **flags) -> dict:
+ """Build a DataSource dict with sensible defaults for a fixture."""
+ base = {
+ "id": idVal,
+ "connectionId": "conn-1",
+ "sourceType": "sharepointFolder",
+ "path": path,
+ "neutralize": None,
+ "ragIndexEnabled": None,
+ "scope": None,
+ }
+ base.update(flags)
+ return base
+
+
+class TestEffectiveFlag(unittest.TestCase):
+ def test_explicit_own_value_wins(self):
+ root = _ds("r", "/", neutralize=False)
+ leaf = _ds("l", "/folder/sub", neutralize=True)
+ self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf]))
+
+ def test_inherits_from_root_when_own_is_none(self):
+ root = _ds("r", "/", neutralize=True)
+ leaf = _ds("l", "/folder/sub")
+ self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf]))
+
+ def test_default_false_when_chain_empty(self):
+ leaf = _ds("l", "/folder/sub")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [leaf]))
+
+ def test_nearest_ancestor_wins_over_distant(self):
+ root = _ds("r", "/", neutralize=False)
+ mid = _ds("m", "/folder", neutralize=True)
+ leaf = _ds("l", "/folder/sub")
+ self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, mid, leaf]))
+
+ def test_different_connection_ignored(self):
+ otherConn = _ds("o", "/", connectionId="conn-2", neutralize=True)
+ leaf = _ds("l", "/folder")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [otherConn, leaf]))
+
+ def test_different_sourcetype_ignored(self):
+ otherType = _ds("o", "/", sourceType="outlookFolder", neutralize=True)
+ leaf = _ds("l", "/folder")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [otherType, leaf]))
+
+ def test_path_separator_required(self):
+ """`/foo` must NOT be ancestor of `/foobar` (no shared `/` boundary)."""
+ notAncestor = _ds("a", "/foo", neutralize=True)
+ leaf = _ds("l", "/foobar")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [notAncestor, leaf]))
+
+ def test_root_is_ancestor_of_everything(self):
+ root = _ds("r", "/", neutralize=True)
+ leaf = _ds("l", "/anything/anywhere")
+ self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf]))
+
+ def test_scope_inheritance_with_string_default(self):
+ root = _ds("r", "/", scope="mandate")
+ leaf = _ds("l", "/folder")
+ self.assertEqual(_inheritFlags.getEffectiveFlag(leaf, "scope", [root, leaf]), "mandate")
+
+ def test_scope_default_personal_when_empty(self):
+ leaf = _ds("l", "/folder")
+ self.assertEqual(_inheritFlags.getEffectiveFlag(leaf, "scope", [leaf]), "personal")
+
+ def test_unknown_flag_raises(self):
+ leaf = _ds("l", "/")
+ with self.assertRaises(ValueError):
+ _inheritFlags.getEffectiveFlag(leaf, "unknownFlag", [leaf])
+
+ def test_explicit_false_overrides_inherited_true(self):
+ """Explicit False on a child must NOT cascade up to True from an ancestor."""
+ root = _ds("r", "/", neutralize=True)
+ leaf = _ds("l", "/folder", neutralize=False)
+ self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf]))
+
+ def test_connection_root_inherits_cross_sourcetype(self):
+ """Connection-root (sourceType=authority, path='/') is ancestor of all DS in that connection."""
+ connRoot = _ds("conn", "/", sourceType="msft", neutralize=True)
+ spService = _ds("sp", "/", sourceType="sharepointFolder")
+ olService = _ds("ol", "/", sourceType="outlookFolder")
+ self.assertTrue(_inheritFlags.getEffectiveFlag(spService, "neutralize", [connRoot, spService, olService]))
+ self.assertTrue(_inheritFlags.getEffectiveFlag(olService, "neutralize", [connRoot, spService, olService]))
+
+ def test_same_sourcetype_ancestor_wins_over_connection_root(self):
+ """A same-sourceType service-root ancestor beats the connection-root."""
+ connRoot = _ds("conn", "/", sourceType="msft", neutralize=True)
+ spRoot = _ds("sp", "/", sourceType="sharepointFolder", neutralize=False)
+ spLeaf = _ds("spl", "/sites/x", sourceType="sharepointFolder")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(spLeaf, "neutralize", [connRoot, spRoot, spLeaf]))
+
+ def test_connection_root_does_not_self_inherit(self):
+ """Connection-root has no ancestor — does not infinite-loop on itself."""
+ connRoot = _ds("conn", "/", sourceType="msft")
+ self.assertFalse(_inheritFlags.getEffectiveFlag(connRoot, "neutralize", [connRoot]))
+
+
+class TestCascadeReset(unittest.TestCase):
+ def _makeRootIf(self, dataSources: List[dict]):
+ rootIf = MagicMock()
+ rootIf.db.getRecordset = MagicMock(return_value=dataSources)
+ modified = []
+
+ def _modify(model, recordId, fields):
+ modified.append((recordId, fields))
+ rootIf.db.recordModify = MagicMock(side_effect=_modify)
+ return rootIf, modified
+
+ def test_resets_only_explicit_descendants(self):
+ parent = _ds("p", "/sites", neutralize=True)
+ explicitChild = _ds("c1", "/sites/folder1", neutralize=False)
+ inheritChild = _ds("c2", "/sites/folder2") # inherit -> not touched
+ sibling = _ds("s", "/other", neutralize=True) # NOT a descendant
+ rootIf, modified = self._makeRootIf([parent, explicitChild, inheritChild, sibling])
+
+ affected = _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize")
+
+ self.assertEqual(affected, 1)
+ self.assertEqual(modified, [("c1", {"neutralize": None})])
+
+ def test_does_not_touch_other_flags(self):
+ parent = _ds("p", "/sites", neutralize=True)
+ child = _ds("c", "/sites/sub", neutralize=False, ragIndexEnabled=True)
+ rootIf, modified = self._makeRootIf([parent, child])
+
+ _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize")
+
+ self.assertEqual(modified, [("c", {"neutralize": None})])
+ # ragIndexEnabled and scope on the child must remain untouched.
+
+ def test_does_not_cross_sourcetype(self):
+ """Non-connection-root parents stay within their sourceType for cascade."""
+ parent = _ds("p", "/", neutralize=True, sourceType="sharepointFolder")
+ otherTypeDescendant = _ds("o", "/anything", neutralize=False, sourceType="outlookFolder")
+ rootIf, modified = self._makeRootIf([parent, otherTypeDescendant])
+
+ affected = _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize")
+
+ self.assertEqual(affected, 0)
+ self.assertEqual(modified, [])
+
+ def test_connection_root_cascades_cross_sourcetype(self):
+ """Toggle on connection-root cascades into every explicit DS of that connection."""
+ connRoot = _ds("conn", "/", sourceType="msft", neutralize=True)
+ spExplicit = _ds("sp", "/", sourceType="sharepointFolder", neutralize=False)
+ olInherit = _ds("ol", "/", sourceType="outlookFolder")
+ spLeafExplicit = _ds("sp-leaf", "/sites/x", sourceType="sharepointFolder", neutralize=True)
+ rootIf, modified = self._makeRootIf([connRoot, spExplicit, olInherit, spLeafExplicit])
+
+ affected = _inheritFlags.cascadeResetDescendants(rootIf, connRoot, "neutralize")
+
+ # spExplicit and spLeafExplicit had explicit values → reset. olInherit untouched.
+ self.assertEqual(affected, 2)
+ self.assertEqual({m[0] for m in modified}, {"sp", "sp-leaf"})
+ for _, fields in modified:
+ self.assertEqual(fields, {"neutralize": None})
+
+ def test_unknown_flag_raises(self):
+ parent = _ds("p", "/", neutralize=True)
+ rootIf, _ = self._makeRootIf([parent])
+ with self.assertRaises(ValueError):
+ _inheritFlags.cascadeResetDescendants(rootIf, parent, "unknownFlag")
+
+
+def _fds(idVal: str, *, tableName: str, recordFilter=None, **flags) -> dict:
+ """Build a FeatureDataSource dict fixture."""
+ base = {
+ "id": idVal,
+ "workspaceInstanceId": "ws-1",
+ "tableName": tableName,
+ "recordFilter": recordFilter,
+ "neutralize": None,
+ "scope": None,
+ }
+ base.update(flags)
+ return base
+
+
+class TestFdsClassifyAndAncestry(unittest.TestCase):
+ def test_classify_workspace_wildcard(self):
+ self.assertEqual(_inheritFlags._fdsClassify(_fds("a", tableName="*")), "workspace")
+
+ def test_classify_table_wildcard(self):
+ self.assertEqual(_inheritFlags._fdsClassify(_fds("a", tableName="Pos")), "table")
+
+ def test_classify_record_specific(self):
+ rec = _fds("a", tableName="Pos", recordFilter={"id": "r-1"})
+ self.assertEqual(_inheritFlags._fdsClassify(rec), "record")
+
+ def test_workspace_is_ancestor_of_table_and_record(self):
+ ws = _fds("ws", tableName="*")
+ tbl = _fds("t", tableName="Pos")
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ self.assertTrue(_inheritFlags._fdsIsAncestor(ws, tbl))
+ self.assertTrue(_inheritFlags._fdsIsAncestor(ws, rec))
+
+ def test_table_is_ancestor_of_record_same_table_only(self):
+ tbl = _fds("t", tableName="Pos")
+ recSame = _fds("r1", tableName="Pos", recordFilter={"id": "1"})
+ recOther = _fds("r2", tableName="Other", recordFilter={"id": "1"})
+ self.assertTrue(_inheritFlags._fdsIsAncestor(tbl, recSame))
+ self.assertFalse(_inheritFlags._fdsIsAncestor(tbl, recOther))
+
+ def test_record_has_no_descendants(self):
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ tbl = _fds("t", tableName="Pos")
+ self.assertFalse(_inheritFlags._fdsIsAncestor(rec, tbl))
+
+ def test_no_cross_workspace_ancestry(self):
+ ws = _fds("ws", tableName="*", workspaceInstanceId="ws-A")
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, workspaceInstanceId="ws-B")
+ self.assertFalse(_inheritFlags._fdsIsAncestor(ws, rec))
+
+
+class TestFdsEffectiveFlag(unittest.TestCase):
+ def test_own_explicit_wins(self):
+ ws = _fds("ws", tableName="*", neutralize=False)
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True)
+ self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, rec]))
+
+ def test_inherits_from_table_wildcard(self):
+ tbl = _fds("t", tableName="Pos", neutralize=True)
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [tbl, rec]))
+
+ def test_table_wildcard_beats_workspace_wildcard(self):
+ ws = _fds("ws", tableName="*", neutralize=False)
+ tbl = _fds("t", tableName="Pos", neutralize=True)
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, tbl, rec]))
+
+ def test_workspace_wildcard_inherits_when_no_table(self):
+ ws = _fds("ws", tableName="*", neutralize=True)
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, rec]))
+
+ def test_default_false_when_chain_empty(self):
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"})
+ self.assertFalse(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [rec]))
+
+ def test_unknown_flag_raises(self):
+ rec = _fds("r", tableName="*")
+ with self.assertRaises(ValueError):
+ _inheritFlags.getEffectiveFlagFds(rec, "ragIndexEnabled", [rec])
+
+
+class TestFdsCascadeReset(unittest.TestCase):
+ def _makeRootIf(self, fdses):
+ rootIf = MagicMock()
+ rootIf.db.getRecordset = MagicMock(return_value=fdses)
+ modified = []
+
+ def _modify(model, recordId, fields):
+ modified.append((recordId, fields))
+ rootIf.db.recordModify = MagicMock(side_effect=_modify)
+ return rootIf, modified
+
+ def test_workspace_cascades_to_all_explicit_descendants(self):
+ ws = _fds("ws", tableName="*", neutralize=True)
+ tblExplicit = _fds("t", tableName="Pos", neutralize=False)
+ tblInherit = _fds("t2", tableName="Other")
+ recExplicit = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True)
+ rootIf, modified = self._makeRootIf([ws, tblExplicit, tblInherit, recExplicit])
+
+ affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, ws, "neutralize")
+
+ self.assertEqual(affected, 2)
+ self.assertEqual({m[0] for m in modified}, {"t", "r"})
+
+ def test_table_cascades_only_to_same_table_records(self):
+ tbl = _fds("t", tableName="Pos", neutralize=True)
+ recSame = _fds("r1", tableName="Pos", recordFilter={"id": "1"}, neutralize=False)
+ recOther = _fds("r2", tableName="Other", recordFilter={"id": "1"}, neutralize=False)
+ rootIf, modified = self._makeRootIf([tbl, recSame, recOther])
+
+ affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, tbl, "neutralize")
+
+ self.assertEqual(affected, 1)
+ self.assertEqual(modified, [("r1", {"neutralize": None})])
+
+ def test_record_has_no_cascade(self):
+ rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True)
+ rootIf, modified = self._makeRootIf([rec])
+ affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, rec, "neutralize")
+ self.assertEqual(affected, 0)
+ self.assertEqual(modified, [])
+
+ def test_unknown_flag_raises(self):
+ ws = _fds("ws", tableName="*", neutralize=True)
+ rootIf, _ = self._makeRootIf([ws])
+ with self.assertRaises(ValueError):
+ _inheritFlags.cascadeResetDescendantsFds(rootIf, ws, "ragIndexEnabled")
+
+
+class TestPathNormalization(unittest.TestCase):
+ def test_empty_path_normalises_to_root(self):
+ self.assertEqual(_inheritFlags._normalisePath(""), "/")
+ self.assertEqual(_inheritFlags._normalisePath(None), "/")
+
+ def test_trailing_slash_stripped(self):
+ self.assertEqual(_inheritFlags._normalisePath("/foo/"), "/foo")
+ self.assertEqual(_inheritFlags._normalisePath("/"), "/")
+
+ def test_leading_slash_added(self):
+ self.assertEqual(_inheritFlags._normalisePath("foo/bar"), "/foo/bar")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/services/test_knowledge_ingest_consumer.py b/tests/unit/services/test_knowledge_ingest_consumer.py
index 6b27a6e8..9884079e 100644
--- a/tests/unit/services/test_knowledge_ingest_consumer.py
+++ b/tests/unit/services/test_knowledge_ingest_consumer.py
@@ -99,11 +99,18 @@ def test_onConnectionRevoked_ignores_missing_id(monkeypatch):
assert seen == []
+def _stubRagEnabledDs(monkeypatch, dataSources):
+ """Stub _loadRagEnabledDataSources so tests don't need a live DB."""
+ monkeypatch.setattr(consumer, "_loadRagEnabledDataSources", lambda *_, **__: dataSources)
+
+
def test_bootstrap_job_skips_unsupported_authority(monkeypatch):
+ _stubRagEnabledDs(monkeypatch, [{"id": "ds1", "sourceType": "unknownType"}])
+
async def _run():
result = await consumer._bootstrapJobHandler(
{"payload": {"connectionId": "c1", "authority": "slack"}},
- lambda *_: None,
+ lambda *_, **__: None,
)
return result
@@ -114,13 +121,18 @@ def test_bootstrap_job_skips_unsupported_authority(monkeypatch):
def test_bootstrap_job_dispatches_msft_parts(monkeypatch):
+ _stubRagEnabledDs(monkeypatch, [
+ {"id": "ds1", "sourceType": "sharepointFolder"},
+ {"id": "ds2", "sourceType": "outlookFolder"},
+ ])
+
calls = {"sp": 0, "ol": 0}
- async def _fakeSp(connectionId, progressCb=None):
+ async def _fakeSp(connectionId, progressCb=None, dataSources=None):
calls["sp"] += 1
return {"indexed": 1}
- async def _fakeOl(connectionId, progressCb=None):
+ async def _fakeOl(connectionId, progressCb=None, dataSources=None):
calls["ol"] += 1
return {"indexed": 2}
@@ -142,7 +154,7 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch):
async def _run():
return await consumer._bootstrapJobHandler(
{"payload": {"connectionId": "c1", "authority": "msft"}},
- lambda *_: None,
+ lambda *_, **__: None,
)
result = asyncio.run(_run())
@@ -152,13 +164,18 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch):
def test_bootstrap_job_dispatches_google_parts(monkeypatch):
+ _stubRagEnabledDs(monkeypatch, [
+ {"id": "ds1", "sourceType": "googleDriveFolder"},
+ {"id": "ds2", "sourceType": "gmailFolder"},
+ ])
+
calls = {"gd": 0, "gm": 0}
- async def _fakeGd(connectionId, progressCb=None):
+ async def _fakeGd(connectionId, progressCb=None, dataSources=None):
calls["gd"] += 1
return {"indexed": 7}
- async def _fakeGm(connectionId, progressCb=None):
+ async def _fakeGm(connectionId, progressCb=None, dataSources=None):
calls["gm"] += 1
return {"indexed": 11}
@@ -180,7 +197,7 @@ def test_bootstrap_job_dispatches_google_parts(monkeypatch):
async def _run():
return await consumer._bootstrapJobHandler(
{"payload": {"connectionId": "c1", "authority": "google"}},
- lambda *_: None,
+ lambda *_, **__: None,
)
result = asyncio.run(_run())
@@ -190,9 +207,13 @@ def test_bootstrap_job_dispatches_google_parts(monkeypatch):
def test_bootstrap_job_dispatches_clickup_part(monkeypatch):
+ _stubRagEnabledDs(monkeypatch, [
+ {"id": "ds1", "sourceType": "clickupList"},
+ ])
+
calls = {"cu": 0}
- async def _fakeCu(connectionId, progressCb=None):
+ async def _fakeCu(connectionId, progressCb=None, dataSources=None):
calls["cu"] += 1
return {"indexed": 4}
@@ -207,7 +228,7 @@ def test_bootstrap_job_dispatches_clickup_part(monkeypatch):
async def _run():
return await consumer._bootstrapJobHandler(
{"payload": {"connectionId": "c1", "authority": "clickup"}},
- lambda *_: None,
+ lambda *_, **__: None,
)
result = asyncio.run(_run())
diff --git a/tests/unit/services/test_ragLimits.py b/tests/unit/services/test_ragLimits.py
new file mode 100644
index 00000000..bb336ed3
--- /dev/null
+++ b/tests/unit/services/test_ragLimits.py
@@ -0,0 +1,79 @@
+"""Unit tests for `_ragLimits` central helpers.
+
+Verifies:
+- defaults are returned as fresh copies (no mutation leakage)
+- getStoredOverrides returns ONLY explicit overrides (walker contract)
+- getRagLimits merges defaults with overrides (API/cost-estimate contract)
+- non-int values in stored settings are dropped, not silently coerced
+"""
+
+from __future__ import annotations
+
+import unittest
+
+from modules.serviceCenter.services.serviceKnowledge import _ragLimits
+
+
+class TestGetDefaults(unittest.TestCase):
+ def test_files_defaults_have_all_keys(self):
+ d = _ragLimits.getDefaults("files")
+ self.assertEqual(set(d.keys()), {"maxItems", "maxBytes", "maxFileSize", "maxDepth"})
+ self.assertEqual(d["maxBytes"], 200 * 1024 * 1024)
+
+ def test_clickup_defaults(self):
+ d = _ragLimits.getDefaults("clickup")
+ self.assertEqual(set(d.keys()), {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"})
+
+ def test_defaults_are_a_fresh_copy(self):
+ d1 = _ragLimits.getDefaults("files")
+ d1["maxBytes"] = 1
+ d2 = _ragLimits.getDefaults("files")
+ self.assertEqual(d2["maxBytes"], 200 * 1024 * 1024)
+
+ def test_unknown_kind_raises(self):
+ with self.assertRaises(ValueError):
+ _ragLimits.getDefaults("unknown")
+
+
+class TestGetStoredOverrides(unittest.TestCase):
+ def test_no_settings_returns_empty_dict(self):
+ self.assertEqual(_ragLimits.getStoredOverrides({"id": "x", "settings": None}, "files"), {})
+
+ def test_only_explicit_overrides_returned(self):
+ ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}}
+ self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {"maxBytes": 999})
+
+ def test_unknown_keys_dropped(self):
+ ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999, "bogus": 1}}}
+ self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {"maxBytes": 999})
+
+ def test_non_int_dropped(self):
+ ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": "not-a-number"}}}
+ self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {})
+
+ def test_none_or_garbage_settings_safe(self):
+ self.assertEqual(_ragLimits.getStoredOverrides(None, "files"), {})
+ self.assertEqual(_ragLimits.getStoredOverrides({"id": "x", "settings": "garbage"}, "files"), {})
+
+
+class TestGetRagLimits(unittest.TestCase):
+ def test_no_settings_returns_defaults(self):
+ result = _ragLimits.getRagLimits({"id": "x", "settings": None}, "files")
+ self.assertEqual(result, _ragLimits.FILES_LIMITS_DEFAULT)
+
+ def test_partial_override_merges_with_defaults(self):
+ ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}}
+ result = _ragLimits.getRagLimits(ds, "files")
+ self.assertEqual(result["maxBytes"], 999)
+ self.assertEqual(result["maxItems"], _ragLimits.FILES_LIMITS_DEFAULT["maxItems"])
+
+ def test_caller_can_distinguish_unset_from_set(self):
+ """Walker contract: an unset key MUST NOT appear in `getStoredOverrides`."""
+ ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}}
+ overrides = _ragLimits.getStoredOverrides(ds, "files")
+ self.assertIn("maxBytes", overrides)
+ self.assertNotIn("maxItems", overrides)
+
+
+if __name__ == "__main__":
+ unittest.main()