diff --git a/app.py b/app.py index d94c7dd5..93cc8b79 100644 --- a/app.py +++ b/app.py @@ -418,6 +418,9 @@ async def lifespan(app: FastAPI): registerKnowledgeIngestionConsumer, ) registerKnowledgeIngestionConsumer() + # Side-effect import: registers all walker progress message keys + # in the i18n registry so `syncRegistryToDb` picks them up. + from modules.serviceCenter.services.serviceKnowledge import _progressMessages # noqa: F401 except Exception as e: logger.warning(f"KnowledgeIngestionConsumer registration failed (non-critical): {e}") diff --git a/modules/datamodels/datamodelBackgroundJob.py b/modules/datamodels/datamodelBackgroundJob.py index fa99ea34..809fb994 100644 --- a/modules/datamodels/datamodelBackgroundJob.py +++ b/modules/datamodels/datamodelBackgroundJob.py @@ -96,6 +96,17 @@ class BackgroundJob(PowerOnModel): description="Human-readable current step (e.g. 'Importing journal entries...')", json_schema_extra={"label": "Fortschritts-Nachricht"}, ) + progressMessageData: Optional[Dict[str, Any]] = Field( + None, + description=( + "Structured i18n payload for `progressMessage`. Shape: " + "{'key': '', 'params': {...}}. " + "Frontend renders via `t(key, params)`; older clients fall back " + "to `progressMessage`. Single source of truth — keep `progressMessage` " + "as the rendered fallback in the producing language." + ), + json_schema_extra={"label": "Fortschritts-Nachricht (i18n)"}, + ) payload: Dict[str, Any] = Field( default_factory=dict, diff --git a/modules/datamodels/datamodelDataSource.py b/modules/datamodels/datamodelDataSource.py index fe3f0442..de32bdf3 100644 --- a/modules/datamodels/datamodelDataSource.py +++ b/modules/datamodels/datamodelDataSource.py @@ -62,9 +62,14 @@ class DataSource(PowerOnModel): description="Owner user ID", json_schema_extra={"label": "Benutzer-ID", "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"}}, ) - ragIndexEnabled: bool = Field( - default=False, - description="When true this tree element is indexed into the RAG knowledge store", + ragIndexEnabled: Optional[bool] = Field( + default=None, + description=( + "Three-state RAG indexing flag with cascade-inherit semantics. " + "None = inherit from nearest ancestor DataSource (path-traversal); " + "True/False = explicit override that propagates to descendants. " + "Walker computes effective value via getEffectiveFlag()." + ), json_schema_extra={"label": "Im RAG indexieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False}, ) lastIndexed: Optional[float] = Field( @@ -72,9 +77,13 @@ class DataSource(PowerOnModel): description="Timestamp of last successful RAG indexing run", json_schema_extra={"label": "Letzte Indexierung", "frontend_type": "timestamp"}, ) - scope: str = Field( - default="personal", - description="Data visibility scope: personal, featureInstance, mandate, global", + scope: Optional[str] = Field( + default=None, + description=( + "Data visibility scope with inherit semantics. " + "None = inherit; values: personal, featureInstance, mandate, global. " + "Cascade-reset on parent toggle." + ), json_schema_extra={"label": "Sichtbarkeit", "frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [ {"value": "personal", "label": "Persönlich"}, {"value": "featureInstance", "label": "Feature-Instanz"}, @@ -82,11 +91,25 @@ class DataSource(PowerOnModel): {"value": "global", "label": "Global"}, ]}, ) - neutralize: bool = Field( - default=False, - description="Whether this data source should be neutralized before AI processing", + neutralize: Optional[bool] = Field( + default=None, + description=( + "Three-state neutralization flag with cascade-inherit semantics. " + "None = inherit from nearest ancestor DataSource (path-traversal); " + "True/False = explicit override that propagates to descendants." + ), json_schema_extra={"label": "Neutralisieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False}, ) + settings: Optional[Dict[str, Any]] = Field( + default=None, + description=( + "DataSource-scoped settings (JSON). Currently used keys: " + "ragLimits.{maxBytes,maxFileSize,maxItems,maxDepth}. " + "Walker reads these directly; missing keys fall back to RAG_LIMITS_DEFAULT " + "and are lazily persisted on next bootstrap." + ), + json_schema_extra={"label": "Einstellungen", "frontend_type": "json", "frontend_readonly": True, "frontend_required": False}, + ) class ExternalEntry(BaseModel): diff --git a/modules/datamodels/datamodelFeatureDataSource.py b/modules/datamodels/datamodelFeatureDataSource.py index dd2c4035..f07a8bda 100644 --- a/modules/datamodels/datamodelFeatureDataSource.py +++ b/modules/datamodels/datamodelFeatureDataSource.py @@ -6,7 +6,7 @@ A FeatureDataSource links a FeatureInstance table (DATA_OBJECT) to a workspace so the agent can query structured feature data (e.g. TrusteePosition rows). """ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field from modules.datamodels.datamodelBase import PowerOnModel from modules.shared.i18nRegistry import i18nModel @@ -55,9 +55,12 @@ class FeatureDataSource(PowerOnModel): description="Workspace feature instance where this source is used", json_schema_extra={"label": "Workspace", "fk_target": {"db": "poweron_app", "table": "FeatureInstance", "labelField": "label"}}, ) - scope: str = Field( - default="personal", - description="Data visibility scope: personal, featureInstance, mandate, global", + scope: Optional[str] = Field( + default=None, + description=( + "Data visibility scope with inherit semantics. " + "None = inherit; values: personal, featureInstance, mandate, global." + ), json_schema_extra={"label": "Sichtbarkeit", "frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [ {"value": "personal", "label": "Persönlich"}, {"value": "featureInstance", "label": "Feature-Instanz"}, @@ -65,9 +68,12 @@ class FeatureDataSource(PowerOnModel): {"value": "global", "label": "Global"}, ]}, ) - neutralize: bool = Field( - default=False, - description="Whether this data source should be neutralized before AI processing", + neutralize: Optional[bool] = Field( + default=None, + description=( + "Three-state neutralization flag with cascade-inherit semantics. " + "None = inherit; True/False = explicit. Cascade-reset on parent toggle." + ), json_schema_extra={"label": "Neutralisieren", "frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False}, ) neutralizeFields: Optional[List[str]] = Field( @@ -80,3 +86,12 @@ class FeatureDataSource(PowerOnModel): description="Record-level filter applied when querying this table, e.g. {'sessionId': 'abc-123'}", json_schema_extra={"label": "Datensatzfilter"}, ) + settings: Optional[Dict[str, Any]] = Field( + default=None, + description=( + "FeatureDataSource-scoped settings (JSON). Currently used keys: " + "ragLimits.{maxBytes,maxFileSize,maxItems,maxDepth}. " + "Mirror of DataSource.settings so the UDB settings modal can target both." + ), + json_schema_extra={"label": "Einstellungen", "frontend_type": "json", "frontend_readonly": True, "frontend_required": False}, + ) diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py index 5827dd11..db50d657 100644 --- a/modules/features/trustee/accounting/accountingDataSync.py +++ b/modules/features/trustee/accounting/accountingDataSync.py @@ -205,11 +205,16 @@ class AccountingDataSync: boundary so the UI poll on ``GET /api/jobs/{jobId}`` shows real movement instead of jumping from 10 % to 100 %. Safe to omit. """ - def _progress(pct: int, msg: str) -> None: + def _progress(pct: int, msgKey: str, msgParams: Optional[Dict[str, Any]] = None) -> None: + """Forward to progressCb using the i18n contract. + + `msgKey` is the German plaintext-as-key; the frontend translates + it via `t(key, params)` when rendering. + """ if progressCb is None: return try: - progressCb(pct, msg) + progressCb(pct, messageKey=msgKey, messageParams=msgParams or {}) except Exception as ex: logger.warning(f"progressCb failed at {pct}%: {ex}") from modules.features.trustee.datamodelFeatureTrustee import ( diff --git a/modules/features/trustee/mainTrustee.py b/modules/features/trustee/mainTrustee.py index 8f725d2f..b3f7cdcf 100644 --- a/modules/features/trustee/mainTrustee.py +++ b/modules/features/trustee/mainTrustee.py @@ -12,6 +12,27 @@ from modules.shared.i18nRegistry import t logger = logging.getLogger(__name__) +# i18n: register BackgroundJob progress message keys used by routeFeatureTrustee / +# accountingDataSync. Walker call sites use `progressCb(..., messageKey="…")` +# without going through `t()`, so we must register each key here as a +# string-literal `t(...)` call -- per i18n convention `t()` MUST receive a +# literal so static scanners and the boot-time `syncRegistryToDb` can pick +# it up. Do NOT collapse these into a loop over a list of variables. +t("Sync wird vorbereitet ({total} Position(en))...") +t("Verbindungsaufbau fehlgeschlagen.") +t("Keine aktive Buchhaltungs-Konfiguration gefunden.") +t("Position {index}/{total} verarbeitet") +t("Sync abgeschlossen.") +t("Initialisiere Import...") +t("Verbinde mit Buchhaltungssystem...") +t("Import abgeschlossen.") +t("Lade Kontenplan...") +t("Lade Journaleintraege vom Buchhaltungssystem...") +t("Lade Kunden...") +t("Lade Lieferanten...") +t("Lade Kontensaldi vom Buchhaltungssystem...") +t("Speichere Kontensaldi...") + # Feature metadata FEATURE_CODE = "trustee" FEATURE_LABEL = t("Treuhand", context="UI") diff --git a/modules/features/trustee/routeFeatureTrustee.py b/modules/features/trustee/routeFeatureTrustee.py index 2c9c3328..a71b508f 100644 --- a/modules/features/trustee/routeFeatureTrustee.py +++ b/modules/features/trustee/routeFeatureTrustee.py @@ -1644,7 +1644,11 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D results = [] total = len(positionIds) - progressCb(2, f"Sync wird vorbereitet ({total} Position(en))...") + progressCb( + 2, + messageKey="Sync wird vorbereitet ({total} Position(en))...", + messageParams={"total": total}, + ) # Resolve connector + plain config once to avoid decryption rate-limits # (mirrors the optimisation in pushBatchToAccounting). We push positions @@ -1655,12 +1659,12 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D connector, plainConfig, configRecord = await bridge._resolveConnectorAndConfig(instanceId) except Exception as resolveErr: logger.exception("Accounting push: failed to resolve connector/config") - progressCb(100, "Verbindungsaufbau fehlgeschlagen.") + progressCb(100, messageKey="Verbindungsaufbau fehlgeschlagen.") raise resolveErr if not connector or not plainConfig: results = [SyncResult(success=False, errorMessage="No active accounting configuration found") for _ in positionIds] - progressCb(100, "Keine aktive Buchhaltungs-Konfiguration gefunden.") + progressCb(100, messageKey="Keine aktive Buchhaltungs-Konfiguration gefunden.") return { "total": len(results), "success": 0, @@ -1680,7 +1684,11 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D results.append(result) # Reserve 5..95% for the push loop, keep the tail for summary. pct = 5 + int(90 * index / total) - progressCb(pct, f"Position {index}/{total} verarbeitet") + progressCb( + pct, + messageKey="Position {index}/{total} verarbeitet", + messageParams={"index": index, "total": total}, + ) skipped = [r for r in results if not r.success and r.errorMessage and "already synced" in r.errorMessage] failed = [r for r in results if not r.success and r not in skipped] @@ -1693,7 +1701,7 @@ async def _trusteeAccountingPushJobHandler(job: Dict[str, Any], progressCb) -> D "; ".join(r.errorMessage or "unknown" for r in failed[:3]), ) - progressCb(100, "Sync abgeschlossen.") + progressCb(100, messageKey="Sync abgeschlossen.") return { "total": len(results), "success": sum(1 for r in results if r.success), @@ -1823,10 +1831,10 @@ async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> D payload = job.get("payload") or {} rootUser = getRootUser() - progressCb(5, "Initialisiere Import...") + progressCb(5, messageKey="Initialisiere Import...") interface = getInterface(rootUser, mandateId=mandateId, featureInstanceId=instanceId) sync = AccountingDataSync(interface) - progressCb(10, "Verbinde mit Buchhaltungssystem...") + progressCb(10, messageKey="Verbinde mit Buchhaltungssystem...") result = await sync.importData( featureInstanceId=instanceId, mandateId=mandateId, @@ -1834,7 +1842,7 @@ async def _trusteeAccountingSyncJobHandler(job: Dict[str, Any], progressCb) -> D dateTo=payload.get("dateTo"), progressCb=progressCb, ) - progressCb(100, "Import abgeschlossen.") + progressCb(100, messageKey="Import abgeschlossen.") return result diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index 4487e5fe..2fa788e8 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -1324,6 +1324,7 @@ async def listWorkspaceConnections( "externalUsername": conn.get("externalUsername"), "externalEmail": conn.get("externalEmail"), "status": status, + "knowledgeIngestionEnabled": bool(conn.get("knowledgeIngestionEnabled")), }) return JSONResponse({"connections": items}) diff --git a/modules/routes/routeDataSources.py b/modules/routes/routeDataSources.py index ba398008..5dec19c8 100644 --- a/modules/routes/routeDataSources.py +++ b/modules/routes/routeDataSources.py @@ -9,11 +9,40 @@ from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body from modules.auth import limiter, getRequestContext, RequestContext from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource +from modules.datamodels.datamodelUam import UserConnection from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeDataSources") logger = logging.getLogger(__name__) + +def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None: + """Forward-only sync: if a DataSource gets RAG-activated, ensure the parent + UserConnection.knowledgeIngestionEnabled is true. + + Intentionally NOT bidirectional: disabling the last DataSource does NOT + auto-clear knowledgeIngestionEnabled, because the consent flag may have + been set explicitly via the Connections page / wizard even before any + DataSource exists. Only the master switch (`/knowledge-consent`) may + clear it. + """ + if not connectionId: + return + try: + currentConn = rootIf.db.getRecord(UserConnection, connectionId) + if not currentConn: + return + if bool(currentConn.get("knowledgeIngestionEnabled")): + return + rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True}) + logger.info( + "Auto-enabled knowledgeIngestionEnabled on UserConnection %s " + "(triggered by first active DataSource).", + connectionId, + ) + except Exception as e: + logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e) + router = APIRouter( prefix="/api/datasources", tags=["Data Sources"], @@ -45,26 +74,43 @@ def _findSourceRecord(db, sourceId: str): def _updateDataSourceScope( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), - scope: str = Body(..., embed=True), + scope: Optional[str] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: - """Update the scope of a DataSource or FeatureDataSource. Global scope requires sysAdmin.""" - if scope not in _VALID_SCOPES: - raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}") + """Update the scope of a DataSource. Cascade-resets explicit descendants. - if scope == "global" and not context.isSysAdmin: - raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope")) + `scope=None` resets this node to inherit (no cascade). Global scope + requires sysAdmin. + """ + if scope is not None: + if scope not in _VALID_SCOPES: + raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}") + if scope == "global" and not context.isSysAdmin: + raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope")) try: from modules.interfaces.interfaceDbApp import getRootInterface + from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + cascadeResetDescendants, + cascadeResetDescendantsFds, + ) rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") rootIf.db.recordModify(model, sourceId, {"scope": scope}) - logger.info("Updated scope=%s for %s %s", scope, model.__name__, sourceId) - return {"sourceId": sourceId, "scope": scope, "updated": True} + cascaded = 0 + if scope is not None: + if model is DataSource: + cascaded = cascadeResetDescendants(rootIf, rec, "scope") + else: + cascaded = cascadeResetDescendantsFds(rootIf, rec, "scope") + logger.info( + "Updated scope=%s for %s %s (cascade-reset %d descendants)", + scope, model.__name__, sourceId, cascaded, + ) + return {"sourceId": sourceId, "scope": scope, "updated": True, "cascadedDescendants": cascaded} except HTTPException: raise except Exception as e: @@ -77,20 +123,36 @@ def _updateDataSourceScope( def _updateDataSourceNeutralize( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), - neutralize: bool = Body(..., embed=True), + neutralize: Optional[bool] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: - """Toggle the neutralization flag on a DataSource or FeatureDataSource.""" + """Set neutralize flag on a DataSource. Cascade-resets explicit descendants. + + `neutralize=None` resets this node to inherit (no cascade). + """ try: from modules.interfaces.interfaceDbApp import getRootInterface + from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + cascadeResetDescendants, + cascadeResetDescendantsFds, + ) rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize}) - logger.info("Updated neutralize=%s for %s %s", neutralize, model.__name__, sourceId) - return {"sourceId": sourceId, "neutralize": neutralize, "updated": True} + cascaded = 0 + if neutralize is not None: + if model is DataSource: + cascaded = cascadeResetDescendants(rootIf, rec, "neutralize") + else: + cascaded = cascadeResetDescendantsFds(rootIf, rec, "neutralize") + logger.info( + "Updated neutralize=%s for %s %s (cascade-reset %d descendants)", + neutralize, model.__name__, sourceId, cascaded, + ) + return {"sourceId": sourceId, "neutralize": neutralize, "updated": True, "cascadedDescendants": cascaded} except HTTPException: raise except Exception as e: @@ -132,13 +194,14 @@ def _updateNeutralizeFields( async def _updateDataSourceRagIndex( request: Request, sourceId: str = Path(..., description="ID of the DataSource"), - ragIndexEnabled: bool = Body(..., embed=True), + ragIndexEnabled: Optional[bool] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: - """Toggle RAG indexing for a DataSource. + """Set RAG indexing flag on a DataSource. Cascade-resets explicit descendants. - true: sets flag + enqueues mini-bootstrap for this DataSource only. - false: sets flag + synchronously purges all chunks from this DataSource. + `ragIndexEnabled=None` resets this node to inherit (no cascade, no purge, + no bootstrap — the node simply follows its ancestor chain afterwards). + `True` enqueues a mini-bootstrap. `False` synchronously purges chunks. Must be `async def` so `await startJob(...)` registers `_runJob` in the main event loop. Sync route → worker thread → temporary loop closes @@ -146,18 +209,26 @@ async def _updateDataSourceRagIndex( """ try: from modules.interfaces.interfaceDbApp import getRootInterface + from modules.serviceCenter.services.serviceKnowledge._inheritFlags import cascadeResetDescendants rootIf = getRootInterface() rec = rootIf.db.getRecord(DataSource, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") rootIf.db.recordModify(DataSource, sourceId, {"ragIndexEnabled": ragIndexEnabled}) - logger.info("Updated ragIndexEnabled=%s for DataSource %s", ragIndexEnabled, sourceId) + cascaded = 0 + if ragIndexEnabled is not None: + cascaded = cascadeResetDescendants(rootIf, rec, "ragIndexEnabled") + logger.info( + "Updated ragIndexEnabled=%s for DataSource %s (cascade-reset %d descendants)", + ragIndexEnabled, sourceId, cascaded, + ) - if ragIndexEnabled: + connectionId = rec.get("connectionId") or rec.get("connection_id") or "" + if ragIndexEnabled is True: + _ensureConnectionKnowledgeFlag(rootIf, connectionId) from modules.serviceCenter.services.serviceBackgroundJobs import startJob - connectionId = rec.get("connectionId") or rec.get("connection_id") or "" conn = rootIf.getUserConnectionById(connectionId) if connectionId else None authority = "" if conn: @@ -168,7 +239,7 @@ async def _updateDataSourceRagIndex( {"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": [sourceId]}, triggeredBy=str(context.user.id), ) - else: + elif ragIndexEnabled is False: from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface purgeResult = getKnowledgeInterface(None).deleteFileContentIndexByDataSource(sourceId) logger.info("Purged %d index rows / %d chunks for DataSource %s", @@ -182,12 +253,164 @@ async def _updateDataSourceRagIndex( mandateId=context.mandateId, category=AuditCategory.PERMISSION.value, action="rag_index_toggled", - details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled}), + details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "cascadedDescendants": cascaded}), ) - return {"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "updated": True} + return {"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "updated": True, "cascadedDescendants": cascaded} except HTTPException: raise except Exception as e: logger.error("Error updating datasource ragIndexEnabled: %s", e) raise HTTPException(status_code=500, detail=str(e)) + + +_CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"} +_ALLOWED_RAG_LIMIT_KEYS = { + "files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"}, + "clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"}, +} + + +def _kindForSource(rec: Dict[str, Any], model) -> str: + """Map a DataSource record to a RAG-limits kind ('files' or 'clickup'). + + FeatureDataSource (tables, not file walkers) reports as 'files' so the + same UI/limit shape works; the limits simply won't be consumed by any + walker today but are stored for forward-compat. + """ + if model is FeatureDataSource: + return "files" + sourceType = str(rec.get("sourceType") or "").strip() + return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files" + + +def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]: + """Coerce an incoming ragLimits dict to {allowedKey: positive int}. + + Unknown keys are silently dropped; non-positive or non-numeric values + are rejected with 400. + """ + if not isinstance(raw, dict): + raise HTTPException(status_code=400, detail="ragLimits must be an object") + allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set()) + cleaned: Dict[str, int] = {} + for key, value in raw.items(): + if key not in allowed: + continue + try: + intValue = int(value) + except (TypeError, ValueError): + raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer") + if intValue <= 0: + raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0") + cleaned[key] = intValue + return cleaned + + +@router.patch("/{sourceId}/settings") +@limiter.limit("30/minute") +def _updateDataSourceSettings( + request: Request, + sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), + settings: Dict[str, Any] = Body(..., embed=True), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key). + + Currently supports `ragLimits` only. Unknown top-level keys in the body are + rejected to avoid silently storing garbage that no consumer reads. + + Owner-only for personal DataSources; mandate/feature scopes additionally + accept the mandate or workspace admins of that scope. + """ + if not isinstance(settings, dict): + raise HTTPException(status_code=400, detail="settings must be an object") + unknown = set(settings.keys()) - {"ragLimits"} + if unknown: + raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}") + + try: + from modules.interfaces.interfaceDbApp import getRootInterface + rootIf = getRootInterface() + rec, model = _findSourceRecord(rootIf.db, sourceId) + if not rec: + raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") + + ownerId = str(rec.get("userId") or "") + currentUserId = str(context.user.id) + if ownerId and ownerId != currentUserId and not context.isSysAdmin: + scope = str(rec.get("scope") or "personal") + isMandateAdmin = getattr(context, "isMandateAdmin", False) + if scope == "personal" or not isMandateAdmin: + raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings") + + kind = _kindForSource(rec, model) + + currentSettings = rec.get("settings") or {} + if not isinstance(currentSettings, dict): + currentSettings = {} + newSettings = dict(currentSettings) + + if "ragLimits" in settings: + cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"]) + mergedLimits = dict(currentSettings.get("ragLimits") or {}) + mergedLimits.update(cleanedLimits) + newSettings["ragLimits"] = mergedLimits + + rootIf.db.recordModify(model, sourceId, {"settings": newSettings}) + + import json + from modules.shared.auditLogger import audit_logger + from modules.datamodels.datamodelAudit import AuditCategory + audit_logger.logEvent( + userId=currentUserId, + mandateId=context.mandateId, + category=AuditCategory.PERMISSION.value, + action="datasource_settings_changed", + details=json.dumps({ + "sourceId": sourceId, + "model": model.__name__, + "oldSettings": currentSettings, + "newSettings": newSettings, + }), + ) + logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId) + return {"sourceId": sourceId, "settings": newSettings, "updated": True} + except HTTPException: + raise + except Exception as e: + logger.error("Error updating datasource settings: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/{sourceId}/cost-estimate") +@limiter.limit("60/minute") +def _getDataSourceCostEstimate( + request: Request, + sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """Return an indicative full-sync cost estimate for the given DataSource. + + Uses the current effective ragLimits (DataSource.settings.ragLimits with + fallback to centralized defaults) as the basis. Returns the same + `{estimatedTokens, estimatedUsd, basis}` shape regardless of source kind. + """ + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.serviceCenter.services.serviceKnowledge import _ragLimits, _costEstimate + rootIf = getRootInterface() + rec, model = _findSourceRecord(rootIf.db, sourceId) + if not rec: + raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") + + kind = _kindForSource(rec, model) + effective = _ragLimits.getRagLimits(rec, kind) + estimate = _costEstimate.estimateBootstrapCost(effective, kind=kind) + estimate["sourceId"] = sourceId + return estimate + except HTTPException: + raise + except Exception as e: + logger.error("Error computing cost estimate: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) diff --git a/modules/routes/routeJobs.py b/modules/routes/routeJobs.py index d2124a0b..9cd89d46 100644 --- a/modules/routes/routeJobs.py +++ b/modules/routes/routeJobs.py @@ -21,7 +21,7 @@ from modules.serviceCenter.services.serviceBackgroundJobs import ( getJobStatus, listJobs, ) -from modules.shared.i18nRegistry import apiRouteContext +from modules.shared.i18nRegistry import apiRouteContext, resolveJobMessage logger = logging.getLogger(__name__) routeApiMsg = apiRouteContext("routeJobs") @@ -34,8 +34,20 @@ router = APIRouter( def _serialiseJob(job: Dict[str, Any]) -> Dict[str, Any]: - """Strip system audit fields and ensure JSON-safe types.""" - return {k: v for k, v in job.items() if not k.startswith("sys")} + """Strip system audit fields, ensure JSON-safe types, translate progress. + + Walkers store progress as a structured payload (``progressMessageData = + {key, params}``). The frontend never calls ``t()`` on backend-supplied + keys (i18n convention #2), so we resolve the payload here using the + request-context language and overwrite ``progressMessage`` with the + fully rendered string. Older clients keep working because they read + the same field. + """ + out = {k: v for k, v in job.items() if not k.startswith("sys")} + translated = resolveJobMessage(out.get("progressMessageData")) + if translated: + out["progressMessage"] = translated + return out def _userHasMandateAccess(context: RequestContext, mandateId: Optional[str]) -> bool: diff --git a/modules/routes/routeRagInventory.py b/modules/routes/routeRagInventory.py index 7c426d77..99d5c4df 100644 --- a/modules/routes/routeRagInventory.py +++ b/modules/routes/routeRagInventory.py @@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Depends, Request from modules.auth import limiter, getCurrentUser, getRequestContext, RequestContext from modules.datamodels.datamodelUam import User -from modules.shared.i18nRegistry import apiRouteContext +from modules.shared.i18nRegistry import apiRouteContext, resolveJobMessage routeApiMsg = apiRouteContext("routeRagInventory") logger = logging.getLogger(__name__) @@ -24,6 +24,53 @@ router = APIRouter( ) +_SUB_RESULT_KEYS = ("sharepoint", "outlook", "drive", "gmail", "clickup", "kdrive") + + +def _flattenJobResult(result: Dict[str, Any]) -> Dict[str, Any]: + """Bootstrap handlers nest per-service results (e.g. msft returns + `{"sharepoint": {...}, "outlook": {...}}`). The UI needs per-connection + aggregates AND the first hit limit, so we sum the counters and pick the + most informative `stoppedAtLimit` across sub-services. + + Returns a flat dict with the same keys the UI expects on `lastSuccess`. + """ + subResults = [result[k] for k in _SUB_RESULT_KEYS if isinstance(result.get(k), dict)] + if not subResults: + # Single-service handler that returns flat dict directly (legacy path). + return result + + indexed = sum(int(r.get("indexed") or 0) for r in subResults) + skippedDup = sum(int(r.get("skippedDuplicate") or 0) for r in subResults) + skippedPol = sum(int(r.get("skippedPolicy") or 0) for r in subResults) + failed = sum(int(r.get("failed") or 0) for r in subResults) + bytes_ = sum(int(r.get("bytesProcessed") or 0) for r in subResults) + # Parallel sub-services: wall-clock ≈ slowest one. + durationMs = max((int(r.get("durationMs") or 0) for r in subResults), default=0) + + # First sub-service that hit a limit wins — UI shows one banner per + # connection; if multiple stopped, the first one is informative enough + # and the user re-runs after raising that budget. + stoppedAtLimit: Optional[str] = None + limits: Dict[str, Any] = {} + for r in subResults: + if r.get("stoppedAtLimit"): + stoppedAtLimit = r["stoppedAtLimit"] + limits = r.get("limits") or {} + break + + return { + "indexed": indexed, + "skippedDuplicate": skippedDup, + "skippedPolicy": skippedPol, + "failed": failed, + "bytesProcessed": bytes_, + "durationMs": durationMs, + "stoppedAtLimit": stoppedAtLimit, + "limits": limits, + } + + def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> List[Dict[str, Any]]: """Build per-connection RAG inventory rows. @@ -111,7 +158,17 @@ def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> L jobs = jobService.listJobs(jobType="connection.bootstrap", limit=50) connJobs = [j for j in jobs if (j.get("payload") or {}).get("connectionId") == connectionId] runningJobs = [ - {"jobId": j["id"], "progress": j.get("progress", 0), "progressMessage": j.get("progressMessage", "")} + { + "jobId": j["id"], + "progress": j.get("progress", 0), + # Server-side translate the structured walker payload into + # the request-context language; frontend renders 1:1 (no + # `t()` on backend-supplied keys). + "progressMessage": ( + resolveJobMessage(j.get("progressMessageData")) + or j.get("progressMessage", "") + ), + } for j in connJobs if j.get("status") in ("PENDING", "RUNNING") ] @@ -126,7 +183,12 @@ def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> L "finishedAt": j.get("finishedAt"), } elif status == "SUCCESS" and lastSuccess is None: - result = j.get("result") or {} + # Bootstrap handlers may return either a flat dict (single + # service) or a nested dict keyed by sub-service (e.g. msft + # returns {"sharepoint": {...}, "outlook": {...}}). Flatten + # so the UI always sees aggregated counters and the first + # sub-service that hit a limit. + result = _flattenJobResult(j.get("result") or {}) lastSuccess = { "jobId": j["id"], "finishedAt": j.get("finishedAt"), @@ -337,7 +399,10 @@ def _getActiveJobs( "connectionLabel": getattr(conn, "displayLabel", None) or getattr(conn, "authority", connId), "jobType": j.get("jobType", "connection.bootstrap"), "progress": j.get("progress", 0), - "progressMessage": j.get("progressMessage", ""), + "progressMessage": ( + resolveJobMessage(j.get("progressMessageData")) + or j.get("progressMessage", "") + ), }) return active except Exception as e: diff --git a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py index e27dae58..90b69bce 100644 --- a/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py +++ b/modules/serviceCenter/services/serviceBackgroundJobs/mainBackgroundJobService.py @@ -54,19 +54,53 @@ _CANCEL_CHECK_INTERVAL_S = 3.0 class JobProgressCallback: - """Callable progress reporter with cooperative cancel-check for long-running walkers.""" + """Callable progress reporter with cooperative cancel-check for long-running walkers. + + Two ways to set a progress message: + progressCb(50, "145 Dateien verarbeitet") # legacy plaintext (DE) + progressCb(50, messageKey="{n} Dateien verarbeitet", + messageParams={"n": 145}) # i18n-friendly + + When `messageKey` is given the structured payload is written to + `BackgroundJob.progressMessageData` so the frontend can render it via + `t(key, params)` in the user's UI language. A best-effort rendered + fallback is also stored in `progressMessage` for older clients, logs, + and audit trails. + """ def __init__(self, jobId: str): self._jobId = jobId self._cancelledCache: Optional[bool] = None self._lastCheckedAt: float = 0.0 - def __call__(self, progress: int, message: Optional[str] = None) -> None: + def __call__( + self, + progress: int, + message: Optional[str] = None, + *, + messageKey: Optional[str] = None, + messageParams: Optional[Dict[str, Any]] = None, + ) -> None: try: clamped = max(0, min(100, int(progress))) fields: Dict[str, Any] = {"progress": clamped} - if message is not None: + + if messageKey is not None: + params = messageParams or {} + try: + fallback = messageKey.format(**params) + except (KeyError, IndexError, ValueError) as fmtErr: + fallback = message or messageKey + logger.warning( + "progressCb message format failed for job %s key=%r params=%r: %s", + self._jobId, messageKey, params, fmtErr, + ) + fields["progressMessageData"] = {"key": messageKey, "params": params} + fields["progressMessage"] = (message or fallback)[:500] + elif message is not None: fields["progressMessage"] = message[:500] + fields["progressMessageData"] = None + _updateJob(self._jobId, fields) except Exception as ex: logger.warning("Progress update failed for job %s: %s", self._jobId, ex) diff --git a/modules/serviceCenter/services/serviceChat/mainServiceChat.py b/modules/serviceCenter/services/serviceChat/mainServiceChat.py index 2ca61d7e..61026de0 100644 --- a/modules/serviceCenter/services/serviceChat/mainServiceChat.py +++ b/modules/serviceCenter/services/serviceChat/mainServiceChat.py @@ -534,11 +534,17 @@ class ChatService: ) -> Dict[str, Any]: """Create a new external data source reference. - Returns existing record if connectionId + path already exists (upsert semantics). + Upsert key is `(connectionId, sourceType, path)`. The same `path='/'` + can carry multiple DataSources discriminated by sourceType: the + Connection-Root (sourceType=, e.g. 'msft') plus one per + service (sourceType='sharepointFolder', 'outlookFolder', ...). The + sourceType filter MUST be present, otherwise a Service-Root POST + returns the Connection-Root and toggles cascade onto every sibling. """ from modules.datamodels.datamodelDataSource import DataSource existing = self.interfaceDbApp.db.getRecordset( - DataSource, recordFilter={"connectionId": connectionId, "path": path} + DataSource, + recordFilter={"connectionId": connectionId, "sourceType": sourceType, "path": path}, ) if existing: return existing[0] if isinstance(existing[0], dict) else existing[0].model_dump() diff --git a/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py b/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py new file mode 100644 index 00000000..565c219d --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/_costEstimate.py @@ -0,0 +1,86 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Indicative cost estimation for a RAG bootstrap run. + +This is **not** a billing-grade forecast: it gives the user a back-of-the-envelope +USD figure for the worst-case full sync, so they can sanity-check before raising +`maxBytes`/`maxItems`. The output always carries the underlying assumptions +(`basis`) so the user can judge plausibility. + +Heuristic: + estimatedTokens = ceil(maxBytes / CHARS_PER_TOKEN_BYTES_FACTOR) + estimatedUsd = estimatedTokens / 1_000_000 * EMBEDDING_USD_PER_MTOKEN + +Defaults match OpenAI `text-embedding-3-small` pricing (2026-Q2). +""" + +from __future__ import annotations + +import math +from typing import Any, Dict + + +CHARS_PER_TOKEN = 4 +EMBEDDING_USD_PER_MTOKEN = 0.02 +DEFAULT_TOKENS_PER_ITEM = 1500 +BYTES_PER_TOKEN_TEXT_FACTOR = 4 +EXTRACTABLE_FRACTION = 0.4 + + +def estimateBootstrapCost(limits: Dict[str, int], kind: str = "files") -> Dict[str, Any]: + """Return an indicative cost estimate dict for a DataSource bootstrap. + + Returned shape:: + + { + "estimatedTokens": int, + "estimatedUsd": float, # rounded to 4 decimals + "basis": { + "kind": "files"|"clickup", + "limits": {...}, + "assumptions": { + "embeddingUsdPerMToken": 0.02, + "charsPerToken": 4, + "extractableFraction": 0.4, + "tokensPerItem": 1500 # only for clickup-like item counts + }, + "notes": "non-binding, depends on real file content..." + } + } + """ + assumptions: Dict[str, Any] = { + "embeddingUsdPerMToken": EMBEDDING_USD_PER_MTOKEN, + "charsPerToken": CHARS_PER_TOKEN, + } + + if kind == "files": + maxBytes = int(limits.get("maxBytes") or 0) + extractableBytes = maxBytes * EXTRACTABLE_FRACTION + estimatedTokens = int(math.ceil(extractableBytes / BYTES_PER_TOKEN_TEXT_FACTOR)) + assumptions["extractableFraction"] = EXTRACTABLE_FRACTION + assumptions["formula"] = "ceil(maxBytes * 0.4 / 4)" + elif kind == "clickup": + maxTasks = int(limits.get("maxTasks") or 0) + maxWorkspaces = max(1, int(limits.get("maxWorkspaces") or 1)) + estimatedTokens = maxTasks * maxWorkspaces * DEFAULT_TOKENS_PER_ITEM + assumptions["tokensPerItem"] = DEFAULT_TOKENS_PER_ITEM + assumptions["formula"] = "maxTasks * maxWorkspaces * 1500" + else: + estimatedTokens = 0 + assumptions["formula"] = "unknown kind, returning zero" + + estimatedUsd = round(estimatedTokens / 1_000_000 * EMBEDDING_USD_PER_MTOKEN, 4) + + return { + "estimatedTokens": estimatedTokens, + "estimatedUsd": estimatedUsd, + "basis": { + "kind": kind, + "limits": dict(limits), + "assumptions": assumptions, + "notes": ( + "Indicative only. Actual cost depends on file types, extractable text " + "ratio, dedup hit-rate, retries, and current embedding model pricing." + ), + }, + } diff --git a/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py b/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py new file mode 100644 index 00000000..00180c9f --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py @@ -0,0 +1,342 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Cascade-inherit semantics for DataSource flags (neutralize, ragIndexEnabled, scope). + +Three-state flags allow tree elements to either set an explicit value or +inherit the value from their nearest ancestor in the path hierarchy. The +walker (RAG/Neutralize) and routes resolve the *effective* value; the cascade +helper resets explicit descendant values when a parent is toggled. + +Path-traversal rules: +- A DataSource is identified by `(connectionId, sourceType, path)`. +- The root of a service tree is `path == '/'`. +- Sub-elements have paths like `/folder1/sub`. Their parent path is the + longest prefix path that exists as a DataSource record (string-based). +- If no ancestor with an explicit value exists, the default is `False` + (or `'personal'` for scope) — matching the legacy behavior of NULL = inherit. +""" + +import logging +from typing import Any, Dict, Iterable, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +_INHERITABLE_FLAGS = ("neutralize", "ragIndexEnabled", "scope") + +# Connection-root DataSources carry the authority as their sourceType +# (e.g. 'msft', 'google'). They sit one level above all service DataSources +# of the same connection in the visual tree, so flag inheritance must +# cross sourceType boundaries — but ONLY from these authority roots. +_AUTHORITY_SOURCE_TYPES = frozenset({"local", "google", "msft", "clickup", "infomaniak"}) + + +def _normalisePath(path: Optional[str]) -> str: + """Normalize a DataSource path to '/'-prefixed, no trailing slash (except root).""" + if not path: + return "/" + p = str(path).strip() + if not p.startswith("/"): + p = "/" + p + if len(p) > 1 and p.endswith("/"): + p = p.rstrip("/") + return p + + +def _flagDefault(flag: str) -> Any: + if flag == "scope": + return "personal" + return False + + +def _isExplicit(value: Any) -> bool: + """A flag value is explicit when it is not None. + + Note: legacy rows may carry empty-string scope; treat as inherit too. + """ + if value is None: + return False + if isinstance(value, str) and value == "": + return False + return True + + +def _getRecordValue(rec: Any, key: str) -> Any: + if isinstance(rec, dict): + return rec.get(key) + return getattr(rec, key, None) + + +def _findAncestorChain( + rec: Dict[str, Any], + allDs: Iterable[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Return all ancestor DataSources of `rec` in the same connection, + ordered nearest-first. + + Two ancestor relations are merged: + 1) **same-sourceType path-ancestor** — strict path-prefix within the + same service tree (sharepointFolder, gmailFolder, ...). + 2) **connection-root ancestor** — a DS with `path='/'` and + `sourceType` ∈ authority set (msft, google, ...) is the parent of + every other DS in that connection regardless of sourceType, so a + toggle on the connection node propagates to all services beneath. + + The connection-root is always the most distant ancestor and therefore + sorts after any same-sourceType ancestors. + """ + recPath = _normalisePath(_getRecordValue(rec, "path")) + recSourceType = _getRecordValue(rec, "sourceType") + recConnectionId = _getRecordValue(rec, "connectionId") + sameTypeCandidates: List[Tuple[int, Dict[str, Any]]] = [] + connectionRoot: Optional[Dict[str, Any]] = None + recIsConnectionRoot = recSourceType in _AUTHORITY_SOURCE_TYPES and recPath == "/" + for cand in allDs: + if _getRecordValue(cand, "id") == _getRecordValue(rec, "id"): + continue + if _getRecordValue(cand, "connectionId") != recConnectionId: + continue + candSourceType = _getRecordValue(cand, "sourceType") + candPath = _normalisePath(_getRecordValue(cand, "path")) + if candSourceType == recSourceType: + if candPath == recPath or not _isAncestorPath(candPath, recPath): + continue + sameTypeCandidates.append((len(candPath), cand)) + elif ( + not recIsConnectionRoot + and candSourceType in _AUTHORITY_SOURCE_TYPES + and candPath == "/" + ): + connectionRoot = cand + sameTypeCandidates.sort(key=lambda x: x[0], reverse=True) + chain = [c for _, c in sameTypeCandidates] + if connectionRoot is not None: + chain.append(connectionRoot) + return chain + + +def _isAncestorPath(ancestor: str, descendant: str) -> bool: + """True iff `ancestor` is a strict path-prefix of `descendant`. + + '/' is ancestor of every non-root path. For non-root prefixes, the + descendant must continue with '/' so '/foo' isn't treated as ancestor of + '/foobar'. + """ + if ancestor == descendant: + return False + if ancestor == "/": + return descendant != "/" + return descendant.startswith(ancestor + "/") + + +def getEffectiveFlag( + rec: Dict[str, Any], + flag: str, + sameConnectionDs: Iterable[Dict[str, Any]], +) -> Any: + """Resolve the effective value of a flag via path-traversal. + + Order: own value (if explicit) → nearest ancestor with explicit value → + static default (`False` or `'personal'`). + """ + if flag not in _INHERITABLE_FLAGS: + raise ValueError(f"Unknown inheritable flag: {flag}") + own = _getRecordValue(rec, flag) + if _isExplicit(own): + return own + chain = _findAncestorChain(rec, sameConnectionDs) + for ancestor in chain: + ancestorVal = _getRecordValue(ancestor, flag) + if _isExplicit(ancestorVal): + return ancestorVal + return _flagDefault(flag) + + +def cascadeResetDescendants( + rootIf: Any, + parentRec: Dict[str, Any], + flag: str, +) -> int: + """Reset all explicit descendant values of `flag` to NULL (= inherit). + + Descendant relation mirrors `_findAncestorChain`: + - Connection-root (`path='/'` AND `sourceType` ∈ authorities) is parent + of every other DS in that connection (cross-sourceType cascade). + - Otherwise: same-sourceType strict path-descendants only. + + Only the targeted `flag` is reset; other flags on the descendant are + untouched. + + Returns the number of records updated. + """ + if flag not in _INHERITABLE_FLAGS: + raise ValueError(f"Unknown inheritable flag: {flag}") + from modules.datamodels.datamodelDataSource import DataSource + + connectionId = _getRecordValue(parentRec, "connectionId") + parentSourceType = _getRecordValue(parentRec, "sourceType") + parentPath = _normalisePath(_getRecordValue(parentRec, "path")) + parentId = _getRecordValue(parentRec, "id") + if not connectionId or not parentSourceType: + return 0 + + parentIsConnectionRoot = ( + parentSourceType in _AUTHORITY_SOURCE_TYPES and parentPath == "/" + ) + + siblings = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) + affected = 0 + for sib in siblings: + sibId = _getRecordValue(sib, "id") + if sibId == parentId: + continue + sibSourceType = _getRecordValue(sib, "sourceType") + sibPath = _normalisePath(_getRecordValue(sib, "path")) + if parentIsConnectionRoot: + # Connection-root resets everything else under this connection. + pass + else: + if sibSourceType != parentSourceType: + continue + if not _isAncestorPath(parentPath, sibPath): + continue + sibVal = _getRecordValue(sib, flag) + if not _isExplicit(sibVal): + continue + try: + rootIf.db.recordModify(DataSource, sibId, {flag: None}) + affected += 1 + except Exception as exc: + logger.warning("Cascade-reset failed for DataSource %s flag=%s: %s", sibId, flag, exc) + if affected: + logger.info( + "Cascade-reset %s on %d descendants of DataSource (connectionId=%s, sourceType=%s, path=%s, connectionRoot=%s)", + flag, affected, connectionId, parentSourceType, parentPath, parentIsConnectionRoot, + ) + return affected + + +def _fdsClassify(fds: Dict[str, Any]) -> str: + """Return 'workspace' | 'table' | 'record' based on the FDS identifier shape.""" + tableName = _getRecordValue(fds, "tableName") or "" + recordFilter = _getRecordValue(fds, "recordFilter") + if tableName == "*": + return "workspace" + if not recordFilter: + return "table" + return "record" + + +def _fdsIsAncestor(parent: Dict[str, Any], child: Dict[str, Any]) -> bool: + """Return True iff `parent` FDS is a strict ancestor of `child` FDS. + + Hierarchy within one `workspaceInstanceId`: + workspace-wildcard (tableName='*') → table-wildcard (tableName='X', !recordFilter) + → record-fds (tableName='X', recordFilter.id=...) + table-wildcard (tableName='X') → record-fds (tableName='X', recordFilter.id=...) + """ + parentWsId = _getRecordValue(parent, "workspaceInstanceId") + childWsId = _getRecordValue(child, "workspaceInstanceId") + if not parentWsId or parentWsId != childWsId: + return False + if _getRecordValue(parent, "id") == _getRecordValue(child, "id"): + return False + parentKind = _fdsClassify(parent) + childKind = _fdsClassify(child) + if parentKind == "workspace": + return childKind in ("table", "record") + if parentKind == "table": + if childKind != "record": + return False + return _getRecordValue(parent, "tableName") == _getRecordValue(child, "tableName") + return False + + +def getEffectiveFlagFds( + rec: Dict[str, Any], + flag: str, + sameWorkspaceFds: Iterable[Dict[str, Any]], +) -> Any: + """Resolve effective value of a FeatureDataSource flag. + + Order: own (if explicit) → table-wildcard (if explicit) → + workspace-wildcard (if explicit) → static default. + """ + if flag not in ("neutralize", "scope"): + raise ValueError(f"Unknown inheritable FDS flag: {flag}") + own = _getRecordValue(rec, flag) + if _isExplicit(own): + return own + workspaceFds: List[Dict[str, Any]] = list(sameWorkspaceFds) + ancestors = [a for a in workspaceFds if _fdsIsAncestor(a, rec)] + ancestors.sort(key=lambda a: 0 if _fdsClassify(a) == "table" else 1) + for ancestor in ancestors: + val = _getRecordValue(ancestor, flag) + if _isExplicit(val): + return val + return _flagDefault(flag) + + +def cascadeResetDescendantsFds( + rootIf: Any, + parentRec: Dict[str, Any], + flag: str, +) -> int: + """Reset explicit `flag` to NULL on every descendant FDS of `parentRec`. + + Only the targeted flag is reset; other flags on descendants are untouched. + Returns the number of records updated. + """ + if flag not in ("neutralize", "scope"): + raise ValueError(f"Unknown inheritable FDS flag: {flag}") + from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource + + workspaceInstanceId = _getRecordValue(parentRec, "workspaceInstanceId") + if not workspaceInstanceId: + return 0 + siblings = rootIf.db.getRecordset( + FeatureDataSource, recordFilter={"workspaceInstanceId": workspaceInstanceId} + ) + affected = 0 + for sib in siblings: + if not _fdsIsAncestor(parentRec, sib): + continue + sibVal = _getRecordValue(sib, flag) + if not _isExplicit(sibVal): + continue + sibId = _getRecordValue(sib, "id") + try: + rootIf.db.recordModify(FeatureDataSource, sibId, {flag: None}) + affected += 1 + except Exception as exc: + logger.warning("FDS cascade-reset failed for %s flag=%s: %s", sibId, flag, exc) + if affected: + logger.info( + "FDS cascade-reset %s on %d descendants of FDS (workspaceInstanceId=%s, kind=%s)", + flag, affected, workspaceInstanceId, _fdsClassify(parentRec), + ) + return affected + + +def buildEffectiveByConnection( + dataSources: Iterable[Dict[str, Any]], + flag: str, +) -> Dict[str, Any]: + """Pre-compute the effective value of `flag` for every DataSource id. + + Useful for batch operations (walker, route DTOs) that touch many records + at once. O(N²) in the worst case but N is bounded per connection. + """ + if flag not in _INHERITABLE_FLAGS: + raise ValueError(f"Unknown inheritable flag: {flag}") + bySourceType: Dict[Tuple[str, str], List[Dict[str, Any]]] = {} + for ds in dataSources: + connId = _getRecordValue(ds, "connectionId") or "" + srcType = _getRecordValue(ds, "sourceType") or "" + bySourceType.setdefault((connId, srcType), []).append(ds) + + out: Dict[str, Any] = {} + for group in bySourceType.values(): + for rec in group: + recId = _getRecordValue(rec, "id") + out[recId] = getEffectiveFlag(rec, flag, group) + return out diff --git a/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py b/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py new file mode 100644 index 00000000..99d91d6b --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/_progressMessages.py @@ -0,0 +1,23 @@ +"""Central i18n registration for BackgroundJob progress messages. + +Walkers and consumers report progress via ``progressCb(..., messageKey="…", +messageParams={...})``. Those keys are not seen by ``t()`` at call time, so +without a stub registration they would never make it into the boot-time +``UiLanguageSet(xx)`` sync. Importing this module is enough to register +every known key — call sites stay clean while translators can still find +the texts in the standard i18n table. + +Keep this list in lockstep with the ``messageKey=`` arguments used in +``subConnectorSync*.py`` and ``subConnectorIngestConsumer.py``. +""" + +from modules.shared.i18nRegistry import t + +# Bootstrap walkers (one per connector family) +t("{n} Dateien verarbeitet, {indexed} indexiert") +t("{n} Tasks verarbeitet, {indexed} indexiert") +t("{n} Mails verarbeitet, {indexed} indexiert") + +# Ingestion consumer hand-offs +t("Verbindung wird aufgebaut ({authority})") +t("Synchronisierung läuft...") diff --git a/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py b/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py new file mode 100644 index 00000000..de0a4886 --- /dev/null +++ b/modules/serviceCenter/services/serviceKnowledge/_ragLimits.py @@ -0,0 +1,107 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Centralized RAG bootstrap limits + DataSource-scoped resolution. + +The original walkers (SharePoint, kDrive, gDrive, ClickUp) each carried their +own module-level `MAX_*_DEFAULT` constants and silently stopped indexing once +they were exceeded. That made it impossible for a user with a 500 MB folder to +override the 200 MB cap without a code change. + +This module is the single source of truth for two things: + +1. The canonical default budget per source kind (`FILES_LIMITS_DEFAULT`, + `CLICKUP_LIMITS_DEFAULT`). Walkers fall back to these when a DataSource has + no `settings.ragLimits` yet. + +2. The pure read/lazy-fill helpers that walkers and the API use to merge a + DataSource's stored settings with the defaults. No override layers, no + resolver chain: what is in `DataSource.settings.ragLimits` is what the + walker uses. + +Lazy fill: the first time a DataSource is processed, the defaults are written +to its `settings.ragLimits` so the UI shows real values immediately, even if +the user has never opened the settings modal. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + + +logger = logging.getLogger(__name__) + + +FILES_LIMITS_DEFAULT: Dict[str, int] = { + "maxItems": 500, + "maxBytes": 200 * 1024 * 1024, + "maxFileSize": 25 * 1024 * 1024, + "maxDepth": 4, +} + + +CLICKUP_LIMITS_DEFAULT: Dict[str, int] = { + "maxTasks": 500, + "maxWorkspaces": 3, + "maxListsPerWorkspace": 20, +} + + +_LIMITS_BY_KIND: Dict[str, Dict[str, int]] = { + "files": FILES_LIMITS_DEFAULT, + "clickup": CLICKUP_LIMITS_DEFAULT, +} + + +def getDefaults(kind: str) -> Dict[str, int]: + """Return a fresh copy of the default budget for the given walker kind. + + `kind` is either "files" (Sharepoint, kDrive, gDrive) or "clickup". + Returning a copy lets callers mutate the result safely. + """ + defaults = _LIMITS_BY_KIND.get(kind) + if defaults is None: + raise ValueError(f"Unknown RAG limit kind: {kind!r}") + return dict(defaults) + + +def getStoredOverrides(dataSource: Optional[Dict[str, Any]], kind: str) -> Dict[str, int]: + """Return ONLY the limits explicitly set on `dataSource.settings.ragLimits`. + + Missing keys are NOT filled with defaults — that is the caller's job (so + a programmatically supplied `limits=` from a Caller still wins when the + DataSource has no override). Pure read, no DB writes. + """ + if not isinstance(dataSource, dict): + return {} + settings = dataSource.get("settings") or {} + if not isinstance(settings, dict): + return {} + stored = settings.get("ragLimits") + if not isinstance(stored, dict): + return {} + allowed = set(_LIMITS_BY_KIND.get(kind, {}).keys()) + out: Dict[str, int] = {} + for key, raw in stored.items(): + if key not in allowed or raw is None: + continue + try: + out[key] = int(raw) + except (TypeError, ValueError): + logger.warning( + "Ignoring non-int ragLimits[%s]=%r on DataSource %s", + key, raw, dataSource.get("id"), + ) + return out + + +def getRagLimits(dataSource: Optional[Dict[str, Any]], kind: str) -> Dict[str, int]: + """Effective RAG limits for the API/cost-estimate use-case. + + Stored overrides win over `getDefaults(kind)`. Walkers should NOT use this + function — they should pass their own caller-limits as the fallback so that + a runtime-supplied `limits=` parameter is honoured (see `getStoredOverrides`). + """ + base = getDefaults(kind) + base.update(getStoredOverrides(dataSource, kind)) + return base diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index c86aed86..618a9965 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -141,18 +141,39 @@ _SOURCE_TYPE_MAP = { def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] = None): - """Load DataSource rows with ragIndexEnabled=true for a connection. + """Load DataSource rows whose *effective* ragIndexEnabled is True. - If dataSourceIds is provided (mini-bootstrap), filter to only those IDs. + Cascade-inherit semantics: a DataSource with `ragIndexEnabled=None` + follows its nearest ancestor's value (path-traversal). Walker iterates + over all DataSources whose effective value resolves to True, including + inherited ones. + + Returned dicts carry **resolved** flags (`neutralize`, `scope`) so the + downstream walkers can keep reading `ds.get("neutralize")` directly + without having to know about the inheritance chain. + + If `dataSourceIds` is provided (mini-bootstrap), the explicit set is + intersected with the effective-true set. """ from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelDataSource import DataSource + from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag rootIf = getRootInterface() allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) + resolved = [] + for ds in allDs: + effRagIndex = getEffectiveFlag(ds, "ragIndexEnabled", allDs) + if effRagIndex is not True: + continue + dsCopy = dict(ds) if isinstance(ds, dict) else {**ds.__dict__} + dsCopy["neutralize"] = getEffectiveFlag(ds, "neutralize", allDs) + dsCopy["scope"] = getEffectiveFlag(ds, "scope", allDs) + dsCopy["ragIndexEnabled"] = True + resolved.append(dsCopy) if dataSourceIds: - return [ds for ds in allDs if ds.get("id") in dataSourceIds and ds.get("ragIndexEnabled")] - return [ds for ds in allDs if ds.get("ragIndexEnabled")] + resolved = [ds for ds in resolved if ds.get("id") in dataSourceIds] + return resolved async def _bootstrapJobHandler( @@ -167,7 +188,11 @@ async def _bootstrapJobHandler( if not connectionId: raise ValueError("connection.bootstrap requires payload.connectionId") - progressCb(5, f"resolving {authority} connection") + progressCb( + 5, + messageKey="Verbindung wird aufgebaut ({authority})", + messageParams={"authority": authority}, + ) # Defensive consent check try: @@ -225,7 +250,7 @@ async def _bootstrapJobHandler( bootstrapOutlook, ) - progressCb(0, "Synchronisierung läuft...") + progressCb(0, messageKey="Synchronisierung läuft...") spDs = _filterDs("sharepoint") olDs = _filterDs("outlook") async def _noopResult(): @@ -251,7 +276,7 @@ async def _bootstrapJobHandler( bootstrapGmail, ) - progressCb(0, "Synchronisierung läuft...") + progressCb(0, messageKey="Synchronisierung läuft...") gdDs = _filterDs("drive") gmDs = _filterDs("gmail") async def _noopResult(): @@ -274,7 +299,7 @@ async def _bootstrapJobHandler( bootstrapClickup, ) - progressCb(0, "Synchronisierung läuft...") + progressCb(0, messageKey="Synchronisierung läuft...") cuDs = _filterDs("clickup") cuResult = await bootstrapClickup(connectionId=connectionId, progressCb=progressCb, dataSources=cuDs) if cuDs else {"skipped": True, "reason": "no_datasources"} return { @@ -288,7 +313,7 @@ async def _bootstrapJobHandler( bootstrapKdrive, ) - progressCb(0, "Synchronisierung läuft...") + progressCb(0, messageKey="Synchronisierung läuft...") kdDs = _filterDs("kdrive") kdResult = await bootstrapKdrive(connectionId=connectionId, progressCb=progressCb, dataSources=kdDs) if kdDs else {"skipped": True, "reason": "no_datasources"} return { diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py index 959e42c9..28c24275 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py @@ -33,13 +33,21 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import ( logger = logging.getLogger(__name__) -MAX_TASKS_DEFAULT = 500 -MAX_WORKSPACES_DEFAULT = 3 -MAX_LISTS_PER_WORKSPACE_DEFAULT = 20 +from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper + +_CLICKUP_DEFAULTS = _ragLimitsHelper.CLICKUP_LIMITS_DEFAULT +MAX_TASKS_DEFAULT = _CLICKUP_DEFAULTS["maxTasks"] +MAX_WORKSPACES_DEFAULT = _CLICKUP_DEFAULTS["maxWorkspaces"] +MAX_LISTS_PER_WORKSPACE_DEFAULT = _CLICKUP_DEFAULTS["maxListsPerWorkspace"] MAX_DESCRIPTION_CHARS_DEFAULT = 8000 MAX_AGE_DAYS_DEFAULT = 180 +def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]: + """Return explicit RAG-limit overrides stored on the DataSource (or {}).""" + return _ragLimitsHelper.getStoredOverrides(ds, "clickup") + + @dataclass class ClickupBootstrapLimits: maxTasks: int = MAX_TASKS_DEFAULT @@ -236,10 +244,11 @@ async def bootstrapClickup( dsId = ds.get("id", "") dsNeutralize = ds.get("neutralize", False) + eff = _resolveDataSourceLimits(dsId, ds) dsLimits = ClickupBootstrapLimits( - maxTasks=limits.maxTasks, - maxWorkspaces=limits.maxWorkspaces, - maxListsPerWorkspace=limits.maxListsPerWorkspace, + maxTasks=eff.get("maxTasks", limits.maxTasks), + maxWorkspaces=eff.get("maxWorkspaces", limits.maxWorkspaces), + maxListsPerWorkspace=eff.get("maxListsPerWorkspace", limits.maxListsPerWorkspace), maxDescriptionChars=limits.maxDescriptionChars, maxAgeDays=limits.maxAgeDays, includeClosed=limits.includeClosed, @@ -520,7 +529,11 @@ async def _ingestTask( if hasattr(progressCb, "isCancelled") and progressCb.isCancelled(): return try: - progressCb(0, f"{processed} Tasks verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Tasks verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass if processed % 50 == 0: diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py index e27abacb..7600cce0 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py @@ -31,13 +31,21 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import ( logger = logging.getLogger(__name__) -MAX_ITEMS_DEFAULT = 500 -MAX_BYTES_DEFAULT = 200 * 1024 * 1024 -MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024 +from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper + +_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT +MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"] +MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"] +MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"] +MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"] SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/") -MAX_DEPTH_DEFAULT = 4 MAX_AGE_DAYS_DEFAULT = 365 + +def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]: + """Return explicit RAG-limit overrides stored on the DataSource (or {}).""" + return _ragLimitsHelper.getStoredOverrides(ds, "files") + FOLDER_MIME = "application/vnd.google-apps.folder" @@ -175,12 +183,13 @@ async def bootstrapGdrive( dsId = ds.get("id", "") dsNeutralize = ds.get("neutralize", False) dsMaxAgeDays = ds.get("maxAgeDays", limits.maxAgeDays) + eff = _resolveDataSourceLimits(dsId, ds) dsLimits = GdriveBootstrapLimits( - maxItems=limits.maxItems, - maxBytes=limits.maxBytes, - maxFileSize=limits.maxFileSize, + maxItems=eff.get("maxItems", limits.maxItems), + maxBytes=eff.get("maxBytes", limits.maxBytes), + maxFileSize=eff.get("maxFileSize", limits.maxFileSize), skipMimePrefixes=limits.skipMimePrefixes, - maxDepth=limits.maxDepth, + maxDepth=eff.get("maxDepth", limits.maxDepth), maxAgeDays=dsMaxAgeDays, neutralize=dsNeutralize, ) @@ -459,7 +468,11 @@ async def _ingestOne( processed = result.indexed + result.skippedDuplicate if progressCb is not None and processed % 5 == 0: try: - progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Dateien verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass logger.info( diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py index 3130e942..96f9cecf 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py @@ -474,7 +474,11 @@ async def _ingestMessage( processed = result.indexed + result.skippedDuplicate if progressCb is not None and processed % 5 == 0: try: - progressCb(0, f"{processed} Mails verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Mails verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass if processed % 50 == 0: diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py index dcf19e39..f95aafd1 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py @@ -27,11 +27,19 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import ( logger = logging.getLogger(__name__) -MAX_ITEMS_DEFAULT = 500 -MAX_BYTES_DEFAULT = 200 * 1024 * 1024 -MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024 +from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper + +_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT +MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"] +MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"] +MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"] +MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"] SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/") -MAX_DEPTH_DEFAULT = 4 + + +def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]: + """Return explicit RAG-limit overrides stored on the DataSource (or {}).""" + return _ragLimitsHelper.getStoredOverrides(ds, "files") @dataclass @@ -143,12 +151,13 @@ async def bootstrapKdrive( dsPath = ds.get("path", "") dsId = ds.get("id", "") dsNeutralize = ds.get("neutralize", False) + eff = _resolveDataSourceLimits(dsId, ds) dsLimits = KdriveBootstrapLimits( - maxItems=limits.maxItems, - maxBytes=limits.maxBytes, - maxFileSize=limits.maxFileSize, + maxItems=eff.get("maxItems", limits.maxItems), + maxBytes=eff.get("maxBytes", limits.maxBytes), + maxFileSize=eff.get("maxFileSize", limits.maxFileSize), skipMimePrefixes=limits.skipMimePrefixes, - maxDepth=limits.maxDepth, + maxDepth=eff.get("maxDepth", limits.maxDepth), neutralize=dsNeutralize, ) @@ -416,7 +425,11 @@ async def _ingestOne( processed = result.indexed + result.skippedDuplicate if progressCb is not None and processed % 5 == 0: try: - progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Dateien verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py index 17220d97..e676b156 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py @@ -460,7 +460,11 @@ async def _ingestMessage( processed = result.indexed + result.skippedDuplicate if progressCb is not None and processed % 5 == 0: try: - progressCb(0, f"{processed} Mails verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Mails verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass if processed % 50 == 0: diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py index e06fd36b..87c4c92a 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py @@ -30,14 +30,27 @@ from modules.serviceCenter.services.serviceKnowledge.subWalkerHelpers import ( logger = logging.getLogger(__name__) -MAX_ITEMS_DEFAULT = 500 -MAX_BYTES_DEFAULT = 200 * 1024 * 1024 -MAX_FILE_SIZE_DEFAULT = 25 * 1024 * 1024 +from modules.serviceCenter.services.serviceKnowledge import _ragLimits as _ragLimitsHelper + +_FILES_DEFAULTS = _ragLimitsHelper.FILES_LIMITS_DEFAULT +MAX_ITEMS_DEFAULT = _FILES_DEFAULTS["maxItems"] +MAX_BYTES_DEFAULT = _FILES_DEFAULTS["maxBytes"] +MAX_FILE_SIZE_DEFAULT = _FILES_DEFAULTS["maxFileSize"] +MAX_DEPTH_DEFAULT = _FILES_DEFAULTS["maxDepth"] SKIP_MIME_PREFIXES_DEFAULT = ("video/", "audio/") -MAX_DEPTH_DEFAULT = 4 MAX_SITES_DEFAULT = 3 +def _resolveDataSourceLimits(dsId: str, ds: Dict[str, Any]) -> Dict[str, int]: + """Return explicit RAG-limit overrides stored on the DataSource. + + Empty dict means "use caller-supplied limits" — never overrides them with + defaults. Used to merge per-DataSource user settings on top of the + walker's runtime limits. + """ + return _ragLimitsHelper.getStoredOverrides(ds, "files") + + @dataclass class SharepointBootstrapLimits: maxItems: int = MAX_ITEMS_DEFAULT @@ -165,12 +178,13 @@ async def bootstrapSharepoint( dsPath = ds.get("path", "") dsId = ds.get("id", "") dsNeutralize = ds.get("neutralize", False) + eff = _resolveDataSourceLimits(dsId, ds) dsLimits = SharepointBootstrapLimits( - maxItems=limits.maxItems, - maxBytes=limits.maxBytes, - maxFileSize=limits.maxFileSize, + maxItems=eff.get("maxItems", limits.maxItems), + maxBytes=eff.get("maxBytes", limits.maxBytes), + maxFileSize=eff.get("maxFileSize", limits.maxFileSize), skipMimePrefixes=limits.skipMimePrefixes, - maxDepth=limits.maxDepth, + maxDepth=eff.get("maxDepth", limits.maxDepth), maxSites=limits.maxSites, neutralize=dsNeutralize, ) @@ -441,7 +455,11 @@ async def _ingestOne( processed = result.indexed + result.skippedDuplicate if progressCb is not None and processed % 5 == 0: try: - progressCb(0, f"{processed} Dateien verarbeitet, {result.indexed} indexiert") + progressCb( + 0, + messageKey="{n} Dateien verarbeitet, {indexed} indexiert", + messageParams={"n": processed, "indexed": result.indexed}, + ) except Exception: pass if processed % 50 == 0: diff --git a/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py index 10be150d..0deae777 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py +++ b/modules/serviceCenter/services/serviceKnowledge/subPolicyResolver.py @@ -1,78 +1,32 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. -"""Resolve effective policies (neutralize, ragIndexEnabled) for DataSource tree hierarchies. +"""DEPRECATED: Use `_inheritFlags.getEffectiveFlag()` directly. -Tree-inheritance rule: nearest ancestor DataSource with an explicit value wins. -If no ancestor has a value, the default (False) is used. +Thin shim to the new cascade-inherit helper. Kept so external callers don't +break on import — internal walkers consume pre-resolved dicts via +`_loadRagEnabledDataSources`. """ from __future__ import annotations -import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List -logger = logging.getLogger(__name__) +from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag def resolveEffectiveNeutralize( ds: Dict[str, Any], allDataSources: List[Dict[str, Any]], ) -> bool: - """Compute effective neutralize by walking up the path tree. - - A DataSource at /sites/HR/Documents inherits from /sites/HR if - that ancestor has neutralize=True and the child has no explicit override. - """ - ownValue = ds.get("neutralize") - if ownValue is not None and ownValue is not False: - return True - if ownValue is False: - return False - return _findAncestorPolicy(ds, allDataSources, "neutralize") + """DEPRECATED: use `getEffectiveFlag(ds, 'neutralize', allDataSources)`.""" + value = getEffectiveFlag(ds, "neutralize", allDataSources) + return bool(value) def resolveEffectiveRagIndexEnabled( ds: Dict[str, Any], allDataSources: List[Dict[str, Any]], ) -> bool: - """Compute effective ragIndexEnabled by walking up the path tree.""" - ownValue = ds.get("ragIndexEnabled") - if ownValue is True: - return True - if ownValue is False: - return False - return _findAncestorPolicy(ds, allDataSources, "ragIndexEnabled") - - -def _findAncestorPolicy( - ds: Dict[str, Any], - allDataSources: List[Dict[str, Any]], - field: str, -) -> bool: - """Walk ancestors (longest-prefix match) to find an inherited policy value.""" - dsPath = ds.get("path", "") - connectionId = ds.get("connectionId", "") - if not dsPath: - return False - - ancestors = [] - for candidate in allDataSources: - if candidate.get("id") == ds.get("id"): - continue - if candidate.get("connectionId") != connectionId: - continue - candidatePath = candidate.get("path", "") - if not candidatePath: - continue - if dsPath.startswith(candidatePath) and len(candidatePath) < len(dsPath): - ancestors.append(candidate) - - ancestors.sort(key=lambda a: len(a.get("path", "")), reverse=True) - - for ancestor in ancestors: - val = ancestor.get(field) - if val is True: - return True - if val is False: - return False - return False + """DEPRECATED: use `getEffectiveFlag(ds, 'ragIndexEnabled', allDataSources)`.""" + value = getEffectiveFlag(ds, "ragIndexEnabled", allDataSources) + return bool(value) diff --git a/modules/shared/i18nRegistry.py b/modules/shared/i18nRegistry.py index 7e620f8d..06ccb20e 100644 --- a/modules/shared/i18nRegistry.py +++ b/modules/shared/i18nRegistry.py @@ -124,6 +124,48 @@ def t(key: str, context: str = "api", value: str = "") -> str: return _CACHE.get(lang, {}).get(key, f"[{key}]") +def resolveJobMessage(messageData: Optional[Dict[str, Any]], lang: Optional[str] = None) -> Optional[str]: + """Translate a structured BackgroundJob progress payload. + + ``messageData`` shape (written by ``JobProgressCallback`` when callers + pass ``messageKey`` / ``messageParams``):: + + {"key": "{n} Dateien verarbeitet, {indexed} indexiert", + "params": {"n": 145, "indexed": 106}} + + The walker call sites use a string-literal ``messageKey=``; the matching + ``t("…")`` literal lives in the feature's progress-key registration + module (e.g. ``serviceKnowledge/_progressMessages.py``, + ``features/trustee/mainTrustee.py``) so the boot sync picks it up. + + This helper is the **server-side** translation hop so route handlers can + deliver a fully rendered ``progressMessage`` string to the frontend -- + the frontend never calls ``t()`` on backend-supplied keys. + """ + if not messageData or not isinstance(messageData, dict): + return None + key = messageData.get("key") + if not isinstance(key, str) or not key: + return None + params = messageData.get("params") or {} + + if lang is not None: + token = _CURRENT_LANGUAGE.set(lang) + try: + template = t(key) + finally: + _CURRENT_LANGUAGE.reset(token) + else: + template = t(key) + + if isinstance(params, dict) and params: + try: + return template.format(**params) + except (KeyError, IndexError, ValueError): + return template + return template + + def resolveText(value: Any, lang: Optional[str] = None) -> str: """Resolve any value to a translated string for the current request language. diff --git a/scripts/debug_rag_job_result.py b/scripts/debug_rag_job_result.py new file mode 100644 index 00000000..c107f21e --- /dev/null +++ b/scripts/debug_rag_job_result.py @@ -0,0 +1,70 @@ +"""Diagnose: read a connection.bootstrap job result and print its keys. + +Usage (from repo root): + python gateway\scripts\debug_rag_job_result.py + +Prints the most recent SUCCESS connection.bootstrap job per UserConnection so +we can see whether the `stoppedAtLimit` key actually landed in the JSONB +`result` column. If it is missing here, the bug is in the writer (handler or +_markSuccess); if it is present here but absent in the HTTP response, the bug +is in routeRagInventory. +""" +from __future__ import annotations + +import os +import sys +import json +from pathlib import Path + +_HERE = Path(__file__).resolve() +sys.path.insert(0, str(_HERE.parent.parent)) # gateway/ +os.chdir(_HERE.parent.parent) + +from modules.shared.configuration import APP_CONFIG # noqa: E402 +from modules.connectors.connectorDbPostgre import getCachedConnector # noqa: E402 +from modules.datamodels.datamodelBackgroundJob import BackgroundJob # noqa: E402 +from modules.routes.routeRagInventory import _flattenJobResult # noqa: E402 + + +def _main() -> None: + db = getCachedConnector( + dbDatabase=APP_CONFIG.get("DB_DATABASE", "poweron_app"), + dbHost=APP_CONFIG.get("DB_HOST", "localhost"), + dbPort=int(APP_CONFIG.get("DB_PORT", "5432")), + dbUser=APP_CONFIG.get("DB_USER"), + dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), + ) + + rows = db.getRecordset(BackgroundJob) + rows = [r for r in rows if r.get("jobType") == "connection.bootstrap"] + rows = [r for r in rows if r.get("status") == "SUCCESS"] + rows.sort(key=lambda r: r.get("createdAt") or 0, reverse=True) + + if not rows: + print("No SUCCESS connection.bootstrap jobs found.") + return + + seenConnections: set[str] = set() + for j in rows: + connId = (j.get("payload") or {}).get("connectionId", "") + if connId in seenConnections: + continue + seenConnections.add(connId) + result = j.get("result") or {} + flat = _flattenJobResult(result) if isinstance(result, dict) else {} + print("=" * 80) + print(f"jobId = {j.get('id')}") + print(f"connectionId = {connId}") + print(f"finishedAt = {j.get('finishedAt')}") + print(f"raw keys = {sorted(result.keys()) if isinstance(result, dict) else 'N/A'}") + print("--- flattened (what the API will return now) ---") + print(f" indexed = {flat.get('indexed')}") + print(f" skippedDuplicate= {flat.get('skippedDuplicate')}") + print(f" skippedPolicy = {flat.get('skippedPolicy')}") + print(f" stoppedAtLimit = {flat.get('stoppedAtLimit')!r} <-- KEY CHECK") + print(f" limits = {flat.get('limits')}") + print(f" bytesProcessed = {flat.get('bytesProcessed')}") + + +if __name__ == "__main__": + _main() diff --git a/scripts/script_db_migrate_backgroundjob_progress_data.py b/scripts/script_db_migrate_backgroundjob_progress_data.py new file mode 100644 index 00000000..bc5fc348 --- /dev/null +++ b/scripts/script_db_migrate_backgroundjob_progress_data.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +"""Migration: Add `progressMessageData` JSONB column to BackgroundJob. + +Carries the structured i18n payload that lets the frontend translate +walker progress messages (e.g. "{n} Dateien verarbeitet, {indexed} +indexiert") into the user's UI language. `progressMessage` stays around +as the rendered fallback for older clients and audit logs. + +Safe to run multiple times (checks column existence before acting). + +Usage: + python scripts/script_db_migrate_backgroundjob_progress_data.py [--dry-run] +""" + +import os +import sys +import argparse +import logging +from pathlib import Path + +scriptPath = Path(__file__).resolve() +gatewayPath = scriptPath.parent.parent +sys.path.insert(0, str(gatewayPath)) +os.chdir(str(gatewayPath)) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True) +logger = logging.getLogger(__name__) + +import psycopg2 +from modules.shared.configuration import APP_CONFIG + + +def _getConnection(): + return psycopg2.connect( + host=APP_CONFIG.get("DB_HOST", "localhost"), + port=int(APP_CONFIG.get("DB_PORT", "5432")), + database=APP_CONFIG.get("DB_DATABASE", "poweron_app"), + user=APP_CONFIG.get("DB_USER"), + password=APP_CONFIG.get("DB_PASSWORD_SECRET"), + ) + + +def _columnExists(cur, table: str, column: str) -> bool: + cur.execute( + """SELECT 1 FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""", + (table, column), + ) + return cur.fetchone() is not None + + +def _tableExists(cur, table: str) -> bool: + cur.execute( + """SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s""", + (table,), + ) + return cur.fetchone() is not None + + +def migrate(dryRun: bool = False): + conn = _getConnection() + conn.autocommit = False + cur = conn.cursor() + + table, column = "BackgroundJob", "progressMessageData" + executed = [] + + if not _tableExists(cur, table): + logger.warning("SKIP: table %s does not exist yet (will be created on next ORM init)", table) + elif _columnExists(cur, table, column): + logger.info("SKIP: %s.%s already exists", table, column) + else: + sql = f'ALTER TABLE public."{table}" ADD COLUMN "{column}" JSONB DEFAULT NULL;' + logger.info("EXEC: %s", sql) + if not dryRun: + cur.execute(sql) + executed.append(sql) + + if not dryRun and executed: + conn.commit() + logger.info("Migration committed (%d statements)", len(executed)) + elif dryRun and executed: + conn.rollback() + logger.info("DRY RUN -- would execute %d statements", len(executed)) + else: + logger.info("Nothing to do -- schema already up to date") + + cur.close() + conn.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing") + args = parser.parse_args() + migrate(dryRun=args.dry_run) diff --git a/scripts/script_db_migrate_datasource_inherit.py b/scripts/script_db_migrate_datasource_inherit.py new file mode 100644 index 00000000..3444cbee --- /dev/null +++ b/scripts/script_db_migrate_datasource_inherit.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""Migration: Drop NOT NULL on DataSource/FeatureDataSource cascade-inherit flags. + +Switches three-valued semantics (NULL = inherit, True/False = explicit) for: + - DataSource.neutralize, ragIndexEnabled, scope + - FeatureDataSource.neutralize, scope + +Existing rows keep their explicit values; only new records (or explicit reset +via cascade) start with NULL. Migration is non-destructive and idempotent. + +Safe to run multiple times. + +Usage: + python scripts/script_db_migrate_datasource_inherit.py [--dry-run] +""" + +import os +import sys +import argparse +import logging +from pathlib import Path + +scriptPath = Path(__file__).resolve() +gatewayPath = scriptPath.parent.parent +sys.path.insert(0, str(gatewayPath)) +os.chdir(str(gatewayPath)) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True) +logger = logging.getLogger(__name__) + +import psycopg2 +from modules.shared.configuration import APP_CONFIG + + +def _getConnection(): + return psycopg2.connect( + host=APP_CONFIG.get("DB_HOST", "localhost"), + port=int(APP_CONFIG.get("DB_PORT", "5432")), + database=APP_CONFIG.get("DB_DATABASE", "poweron_app"), + user=APP_CONFIG.get("DB_USER"), + password=APP_CONFIG.get("DB_PASSWORD_SECRET"), + ) + + +def _tableExists(cur, table: str) -> bool: + cur.execute( + """SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s""", + (table,), + ) + return cur.fetchone() is not None + + +def _columnIsNullable(cur, table: str, column: str) -> bool: + cur.execute( + """SELECT is_nullable FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""", + (table, column), + ) + row = cur.fetchone() + if not row: + return False + return row[0] == "YES" + + +def migrate(dryRun: bool = False): + conn = _getConnection() + conn.autocommit = False + cur = conn.cursor() + + targets = [ + ("DataSource", "neutralize"), + ("DataSource", "ragIndexEnabled"), + ("DataSource", "scope"), + ("FeatureDataSource", "neutralize"), + ("FeatureDataSource", "scope"), + ] + + executed = [] + for table, column in targets: + if not _tableExists(cur, table): + logger.warning("SKIP: table %s does not exist yet", table) + continue + if _columnIsNullable(cur, table, column): + logger.info("SKIP: %s.%s already nullable", table, column) + continue + sql = f'ALTER TABLE public."{table}" ALTER COLUMN "{column}" DROP NOT NULL;' + logger.info("EXEC: %s", sql) + if not dryRun: + cur.execute(sql) + executed.append(sql) + + if not dryRun and executed: + conn.commit() + logger.info("Migration committed (%d statements)", len(executed)) + elif dryRun and executed: + conn.rollback() + logger.info("DRY RUN -- would execute %d statements", len(executed)) + else: + logger.info("Nothing to do -- schema already nullable") + + cur.close() + conn.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing") + args = parser.parse_args() + migrate(dryRun=args.dry_run) diff --git a/scripts/script_db_migrate_datasource_settings.py b/scripts/script_db_migrate_datasource_settings.py new file mode 100644 index 00000000..9e821221 --- /dev/null +++ b/scripts/script_db_migrate_datasource_settings.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +"""Migration: Add `settings` JSONB column to DataSource and FeatureDataSource. + +This is a one-off migration for the UDB DataSource Settings (Settings-Icon) +feature: walkers read RAG limits (maxBytes, maxFileSize, maxItems, maxDepth) +from this JSON blob, the UI edits them. Existing rows get NULL until the +next bootstrap lazy-fills sensible defaults from `_ragLimits.RAG_LIMITS_DEFAULT`. + +Safe to run multiple times (checks column existence before acting). + +Usage: + python scripts/script_db_migrate_datasource_settings.py [--dry-run] +""" + +import os +import sys +import argparse +import logging +from pathlib import Path + +scriptPath = Path(__file__).resolve() +gatewayPath = scriptPath.parent.parent +sys.path.insert(0, str(gatewayPath)) +os.chdir(str(gatewayPath)) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True) +logger = logging.getLogger(__name__) + +import psycopg2 +from modules.shared.configuration import APP_CONFIG + + +def _getConnection(): + return psycopg2.connect( + host=APP_CONFIG.get("DB_HOST", "localhost"), + port=int(APP_CONFIG.get("DB_PORT", "5432")), + database=APP_CONFIG.get("DB_DATABASE", "poweron_app"), + user=APP_CONFIG.get("DB_USER"), + password=APP_CONFIG.get("DB_PASSWORD_SECRET"), + ) + + +def _columnExists(cur, table: str, column: str) -> bool: + cur.execute( + """SELECT 1 FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s AND column_name = %s""", + (table, column), + ) + return cur.fetchone() is not None + + +def _tableExists(cur, table: str) -> bool: + cur.execute( + """SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s""", + (table,), + ) + return cur.fetchone() is not None + + +def migrate(dryRun: bool = False): + conn = _getConnection() + conn.autocommit = False + cur = conn.cursor() + + targets = [ + ("DataSource", "settings"), + ("FeatureDataSource", "settings"), + ] + + executed = [] + for table, column in targets: + if not _tableExists(cur, table): + logger.warning("SKIP: table %s does not exist yet (will be created on next ORM init)", table) + continue + if _columnExists(cur, table, column): + logger.info("SKIP: %s.%s already exists", table, column) + continue + sql = f'ALTER TABLE public."{table}" ADD COLUMN "{column}" JSONB DEFAULT NULL;' + logger.info("EXEC: %s", sql) + if not dryRun: + cur.execute(sql) + executed.append(sql) + + if not dryRun and executed: + conn.commit() + logger.info("Migration committed (%d statements)", len(executed)) + elif dryRun and executed: + conn.rollback() + logger.info("DRY RUN -- would execute %d statements", len(executed)) + else: + logger.info("Nothing to do -- schema already up to date") + + cur.close() + conn.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--dry-run", action="store_true", help="Print SQL without executing") + args = parser.parse_args() + migrate(dryRun=args.dry_run) diff --git a/tests/unit/services/test_costEstimate.py b/tests/unit/services/test_costEstimate.py new file mode 100644 index 00000000..e49aca6a --- /dev/null +++ b/tests/unit/services/test_costEstimate.py @@ -0,0 +1,55 @@ +"""Unit tests for `_costEstimate` heuristic. + +Validates the output shape, basic formulas, and that 'basis' annotations +are always present (the user-facing transparency contract). +""" + +from __future__ import annotations + +import unittest + +from modules.serviceCenter.services.serviceKnowledge import _costEstimate + + +class TestCostEstimate(unittest.TestCase): + def test_files_shape(self): + result = _costEstimate.estimateBootstrapCost( + {"maxBytes": 200 * 1024 * 1024}, kind="files", + ) + self.assertIn("estimatedTokens", result) + self.assertIn("estimatedUsd", result) + self.assertIn("basis", result) + self.assertIn("assumptions", result["basis"]) + self.assertIn("formula", result["basis"]["assumptions"]) + self.assertIn("notes", result["basis"]) + + def test_files_doubling_maxBytes_doubles_tokens(self): + low = _costEstimate.estimateBootstrapCost({"maxBytes": 100 * 1024 * 1024}, kind="files") + high = _costEstimate.estimateBootstrapCost({"maxBytes": 200 * 1024 * 1024}, kind="files") + self.assertEqual(high["estimatedTokens"], low["estimatedTokens"] * 2) + + def test_clickup_uses_tasks_and_workspaces(self): + result = _costEstimate.estimateBootstrapCost( + {"maxTasks": 100, "maxWorkspaces": 2, "maxListsPerWorkspace": 10}, + kind="clickup", + ) + expectedTokens = 100 * 2 * _costEstimate.DEFAULT_TOKENS_PER_ITEM + self.assertEqual(result["estimatedTokens"], expectedTokens) + + def test_unknown_kind_returns_zero(self): + result = _costEstimate.estimateBootstrapCost({}, kind="totally-unknown") + self.assertEqual(result["estimatedTokens"], 0) + self.assertEqual(result["estimatedUsd"], 0.0) + + def test_usd_is_rounded_4_decimals(self): + result = _costEstimate.estimateBootstrapCost({"maxBytes": 1024 * 1024}, kind="files") + rounded = round(result["estimatedUsd"], 4) + self.assertEqual(result["estimatedUsd"], rounded) + + def test_basis_includes_input_limits(self): + result = _costEstimate.estimateBootstrapCost({"maxBytes": 42}, kind="files") + self.assertEqual(result["basis"]["limits"]["maxBytes"], 42) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/services/test_inheritFlags.py b/tests/unit/services/test_inheritFlags.py new file mode 100644 index 00000000..b177e767 --- /dev/null +++ b/tests/unit/services/test_inheritFlags.py @@ -0,0 +1,330 @@ +"""Unit tests for `_inheritFlags` cascade-inherit helpers. + +Verifies: +- getEffectiveFlag walks ancestors via path-prefix matching +- root default is False (or 'personal' for scope) when nothing explicit in chain +- only same-connectionId AND same-sourceType ancestors are considered +- cascadeResetDescendants only touches descendants with explicit values for THAT flag +- '/' is treated as ancestor of every non-root path +- '/foo' is NOT ancestor of '/foobar' (must require '/' separator) +""" + +from __future__ import annotations + +import unittest +from typing import List +from unittest.mock import MagicMock + +from modules.serviceCenter.services.serviceKnowledge import _inheritFlags + + +def _ds(idVal: str, path: str, **flags) -> dict: + """Build a DataSource dict with sensible defaults for a fixture.""" + base = { + "id": idVal, + "connectionId": "conn-1", + "sourceType": "sharepointFolder", + "path": path, + "neutralize": None, + "ragIndexEnabled": None, + "scope": None, + } + base.update(flags) + return base + + +class TestEffectiveFlag(unittest.TestCase): + def test_explicit_own_value_wins(self): + root = _ds("r", "/", neutralize=False) + leaf = _ds("l", "/folder/sub", neutralize=True) + self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf])) + + def test_inherits_from_root_when_own_is_none(self): + root = _ds("r", "/", neutralize=True) + leaf = _ds("l", "/folder/sub") + self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf])) + + def test_default_false_when_chain_empty(self): + leaf = _ds("l", "/folder/sub") + self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [leaf])) + + def test_nearest_ancestor_wins_over_distant(self): + root = _ds("r", "/", neutralize=False) + mid = _ds("m", "/folder", neutralize=True) + leaf = _ds("l", "/folder/sub") + self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, mid, leaf])) + + def test_different_connection_ignored(self): + otherConn = _ds("o", "/", connectionId="conn-2", neutralize=True) + leaf = _ds("l", "/folder") + self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [otherConn, leaf])) + + def test_different_sourcetype_ignored(self): + otherType = _ds("o", "/", sourceType="outlookFolder", neutralize=True) + leaf = _ds("l", "/folder") + self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [otherType, leaf])) + + def test_path_separator_required(self): + """`/foo` must NOT be ancestor of `/foobar` (no shared `/` boundary).""" + notAncestor = _ds("a", "/foo", neutralize=True) + leaf = _ds("l", "/foobar") + self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [notAncestor, leaf])) + + def test_root_is_ancestor_of_everything(self): + root = _ds("r", "/", neutralize=True) + leaf = _ds("l", "/anything/anywhere") + self.assertTrue(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf])) + + def test_scope_inheritance_with_string_default(self): + root = _ds("r", "/", scope="mandate") + leaf = _ds("l", "/folder") + self.assertEqual(_inheritFlags.getEffectiveFlag(leaf, "scope", [root, leaf]), "mandate") + + def test_scope_default_personal_when_empty(self): + leaf = _ds("l", "/folder") + self.assertEqual(_inheritFlags.getEffectiveFlag(leaf, "scope", [leaf]), "personal") + + def test_unknown_flag_raises(self): + leaf = _ds("l", "/") + with self.assertRaises(ValueError): + _inheritFlags.getEffectiveFlag(leaf, "unknownFlag", [leaf]) + + def test_explicit_false_overrides_inherited_true(self): + """Explicit False on a child must NOT cascade up to True from an ancestor.""" + root = _ds("r", "/", neutralize=True) + leaf = _ds("l", "/folder", neutralize=False) + self.assertFalse(_inheritFlags.getEffectiveFlag(leaf, "neutralize", [root, leaf])) + + def test_connection_root_inherits_cross_sourcetype(self): + """Connection-root (sourceType=authority, path='/') is ancestor of all DS in that connection.""" + connRoot = _ds("conn", "/", sourceType="msft", neutralize=True) + spService = _ds("sp", "/", sourceType="sharepointFolder") + olService = _ds("ol", "/", sourceType="outlookFolder") + self.assertTrue(_inheritFlags.getEffectiveFlag(spService, "neutralize", [connRoot, spService, olService])) + self.assertTrue(_inheritFlags.getEffectiveFlag(olService, "neutralize", [connRoot, spService, olService])) + + def test_same_sourcetype_ancestor_wins_over_connection_root(self): + """A same-sourceType service-root ancestor beats the connection-root.""" + connRoot = _ds("conn", "/", sourceType="msft", neutralize=True) + spRoot = _ds("sp", "/", sourceType="sharepointFolder", neutralize=False) + spLeaf = _ds("spl", "/sites/x", sourceType="sharepointFolder") + self.assertFalse(_inheritFlags.getEffectiveFlag(spLeaf, "neutralize", [connRoot, spRoot, spLeaf])) + + def test_connection_root_does_not_self_inherit(self): + """Connection-root has no ancestor — does not infinite-loop on itself.""" + connRoot = _ds("conn", "/", sourceType="msft") + self.assertFalse(_inheritFlags.getEffectiveFlag(connRoot, "neutralize", [connRoot])) + + +class TestCascadeReset(unittest.TestCase): + def _makeRootIf(self, dataSources: List[dict]): + rootIf = MagicMock() + rootIf.db.getRecordset = MagicMock(return_value=dataSources) + modified = [] + + def _modify(model, recordId, fields): + modified.append((recordId, fields)) + rootIf.db.recordModify = MagicMock(side_effect=_modify) + return rootIf, modified + + def test_resets_only_explicit_descendants(self): + parent = _ds("p", "/sites", neutralize=True) + explicitChild = _ds("c1", "/sites/folder1", neutralize=False) + inheritChild = _ds("c2", "/sites/folder2") # inherit -> not touched + sibling = _ds("s", "/other", neutralize=True) # NOT a descendant + rootIf, modified = self._makeRootIf([parent, explicitChild, inheritChild, sibling]) + + affected = _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize") + + self.assertEqual(affected, 1) + self.assertEqual(modified, [("c1", {"neutralize": None})]) + + def test_does_not_touch_other_flags(self): + parent = _ds("p", "/sites", neutralize=True) + child = _ds("c", "/sites/sub", neutralize=False, ragIndexEnabled=True) + rootIf, modified = self._makeRootIf([parent, child]) + + _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize") + + self.assertEqual(modified, [("c", {"neutralize": None})]) + # ragIndexEnabled and scope on the child must remain untouched. + + def test_does_not_cross_sourcetype(self): + """Non-connection-root parents stay within their sourceType for cascade.""" + parent = _ds("p", "/", neutralize=True, sourceType="sharepointFolder") + otherTypeDescendant = _ds("o", "/anything", neutralize=False, sourceType="outlookFolder") + rootIf, modified = self._makeRootIf([parent, otherTypeDescendant]) + + affected = _inheritFlags.cascadeResetDescendants(rootIf, parent, "neutralize") + + self.assertEqual(affected, 0) + self.assertEqual(modified, []) + + def test_connection_root_cascades_cross_sourcetype(self): + """Toggle on connection-root cascades into every explicit DS of that connection.""" + connRoot = _ds("conn", "/", sourceType="msft", neutralize=True) + spExplicit = _ds("sp", "/", sourceType="sharepointFolder", neutralize=False) + olInherit = _ds("ol", "/", sourceType="outlookFolder") + spLeafExplicit = _ds("sp-leaf", "/sites/x", sourceType="sharepointFolder", neutralize=True) + rootIf, modified = self._makeRootIf([connRoot, spExplicit, olInherit, spLeafExplicit]) + + affected = _inheritFlags.cascadeResetDescendants(rootIf, connRoot, "neutralize") + + # spExplicit and spLeafExplicit had explicit values → reset. olInherit untouched. + self.assertEqual(affected, 2) + self.assertEqual({m[0] for m in modified}, {"sp", "sp-leaf"}) + for _, fields in modified: + self.assertEqual(fields, {"neutralize": None}) + + def test_unknown_flag_raises(self): + parent = _ds("p", "/", neutralize=True) + rootIf, _ = self._makeRootIf([parent]) + with self.assertRaises(ValueError): + _inheritFlags.cascadeResetDescendants(rootIf, parent, "unknownFlag") + + +def _fds(idVal: str, *, tableName: str, recordFilter=None, **flags) -> dict: + """Build a FeatureDataSource dict fixture.""" + base = { + "id": idVal, + "workspaceInstanceId": "ws-1", + "tableName": tableName, + "recordFilter": recordFilter, + "neutralize": None, + "scope": None, + } + base.update(flags) + return base + + +class TestFdsClassifyAndAncestry(unittest.TestCase): + def test_classify_workspace_wildcard(self): + self.assertEqual(_inheritFlags._fdsClassify(_fds("a", tableName="*")), "workspace") + + def test_classify_table_wildcard(self): + self.assertEqual(_inheritFlags._fdsClassify(_fds("a", tableName="Pos")), "table") + + def test_classify_record_specific(self): + rec = _fds("a", tableName="Pos", recordFilter={"id": "r-1"}) + self.assertEqual(_inheritFlags._fdsClassify(rec), "record") + + def test_workspace_is_ancestor_of_table_and_record(self): + ws = _fds("ws", tableName="*") + tbl = _fds("t", tableName="Pos") + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + self.assertTrue(_inheritFlags._fdsIsAncestor(ws, tbl)) + self.assertTrue(_inheritFlags._fdsIsAncestor(ws, rec)) + + def test_table_is_ancestor_of_record_same_table_only(self): + tbl = _fds("t", tableName="Pos") + recSame = _fds("r1", tableName="Pos", recordFilter={"id": "1"}) + recOther = _fds("r2", tableName="Other", recordFilter={"id": "1"}) + self.assertTrue(_inheritFlags._fdsIsAncestor(tbl, recSame)) + self.assertFalse(_inheritFlags._fdsIsAncestor(tbl, recOther)) + + def test_record_has_no_descendants(self): + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + tbl = _fds("t", tableName="Pos") + self.assertFalse(_inheritFlags._fdsIsAncestor(rec, tbl)) + + def test_no_cross_workspace_ancestry(self): + ws = _fds("ws", tableName="*", workspaceInstanceId="ws-A") + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, workspaceInstanceId="ws-B") + self.assertFalse(_inheritFlags._fdsIsAncestor(ws, rec)) + + +class TestFdsEffectiveFlag(unittest.TestCase): + def test_own_explicit_wins(self): + ws = _fds("ws", tableName="*", neutralize=False) + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True) + self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, rec])) + + def test_inherits_from_table_wildcard(self): + tbl = _fds("t", tableName="Pos", neutralize=True) + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [tbl, rec])) + + def test_table_wildcard_beats_workspace_wildcard(self): + ws = _fds("ws", tableName="*", neutralize=False) + tbl = _fds("t", tableName="Pos", neutralize=True) + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, tbl, rec])) + + def test_workspace_wildcard_inherits_when_no_table(self): + ws = _fds("ws", tableName="*", neutralize=True) + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + self.assertTrue(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [ws, rec])) + + def test_default_false_when_chain_empty(self): + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}) + self.assertFalse(_inheritFlags.getEffectiveFlagFds(rec, "neutralize", [rec])) + + def test_unknown_flag_raises(self): + rec = _fds("r", tableName="*") + with self.assertRaises(ValueError): + _inheritFlags.getEffectiveFlagFds(rec, "ragIndexEnabled", [rec]) + + +class TestFdsCascadeReset(unittest.TestCase): + def _makeRootIf(self, fdses): + rootIf = MagicMock() + rootIf.db.getRecordset = MagicMock(return_value=fdses) + modified = [] + + def _modify(model, recordId, fields): + modified.append((recordId, fields)) + rootIf.db.recordModify = MagicMock(side_effect=_modify) + return rootIf, modified + + def test_workspace_cascades_to_all_explicit_descendants(self): + ws = _fds("ws", tableName="*", neutralize=True) + tblExplicit = _fds("t", tableName="Pos", neutralize=False) + tblInherit = _fds("t2", tableName="Other") + recExplicit = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True) + rootIf, modified = self._makeRootIf([ws, tblExplicit, tblInherit, recExplicit]) + + affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, ws, "neutralize") + + self.assertEqual(affected, 2) + self.assertEqual({m[0] for m in modified}, {"t", "r"}) + + def test_table_cascades_only_to_same_table_records(self): + tbl = _fds("t", tableName="Pos", neutralize=True) + recSame = _fds("r1", tableName="Pos", recordFilter={"id": "1"}, neutralize=False) + recOther = _fds("r2", tableName="Other", recordFilter={"id": "1"}, neutralize=False) + rootIf, modified = self._makeRootIf([tbl, recSame, recOther]) + + affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, tbl, "neutralize") + + self.assertEqual(affected, 1) + self.assertEqual(modified, [("r1", {"neutralize": None})]) + + def test_record_has_no_cascade(self): + rec = _fds("r", tableName="Pos", recordFilter={"id": "1"}, neutralize=True) + rootIf, modified = self._makeRootIf([rec]) + affected = _inheritFlags.cascadeResetDescendantsFds(rootIf, rec, "neutralize") + self.assertEqual(affected, 0) + self.assertEqual(modified, []) + + def test_unknown_flag_raises(self): + ws = _fds("ws", tableName="*", neutralize=True) + rootIf, _ = self._makeRootIf([ws]) + with self.assertRaises(ValueError): + _inheritFlags.cascadeResetDescendantsFds(rootIf, ws, "ragIndexEnabled") + + +class TestPathNormalization(unittest.TestCase): + def test_empty_path_normalises_to_root(self): + self.assertEqual(_inheritFlags._normalisePath(""), "/") + self.assertEqual(_inheritFlags._normalisePath(None), "/") + + def test_trailing_slash_stripped(self): + self.assertEqual(_inheritFlags._normalisePath("/foo/"), "/foo") + self.assertEqual(_inheritFlags._normalisePath("/"), "/") + + def test_leading_slash_added(self): + self.assertEqual(_inheritFlags._normalisePath("foo/bar"), "/foo/bar") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/services/test_knowledge_ingest_consumer.py b/tests/unit/services/test_knowledge_ingest_consumer.py index 6b27a6e8..9884079e 100644 --- a/tests/unit/services/test_knowledge_ingest_consumer.py +++ b/tests/unit/services/test_knowledge_ingest_consumer.py @@ -99,11 +99,18 @@ def test_onConnectionRevoked_ignores_missing_id(monkeypatch): assert seen == [] +def _stubRagEnabledDs(monkeypatch, dataSources): + """Stub _loadRagEnabledDataSources so tests don't need a live DB.""" + monkeypatch.setattr(consumer, "_loadRagEnabledDataSources", lambda *_, **__: dataSources) + + def test_bootstrap_job_skips_unsupported_authority(monkeypatch): + _stubRagEnabledDs(monkeypatch, [{"id": "ds1", "sourceType": "unknownType"}]) + async def _run(): result = await consumer._bootstrapJobHandler( {"payload": {"connectionId": "c1", "authority": "slack"}}, - lambda *_: None, + lambda *_, **__: None, ) return result @@ -114,13 +121,18 @@ def test_bootstrap_job_skips_unsupported_authority(monkeypatch): def test_bootstrap_job_dispatches_msft_parts(monkeypatch): + _stubRagEnabledDs(monkeypatch, [ + {"id": "ds1", "sourceType": "sharepointFolder"}, + {"id": "ds2", "sourceType": "outlookFolder"}, + ]) + calls = {"sp": 0, "ol": 0} - async def _fakeSp(connectionId, progressCb=None): + async def _fakeSp(connectionId, progressCb=None, dataSources=None): calls["sp"] += 1 return {"indexed": 1} - async def _fakeOl(connectionId, progressCb=None): + async def _fakeOl(connectionId, progressCb=None, dataSources=None): calls["ol"] += 1 return {"indexed": 2} @@ -142,7 +154,7 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch): async def _run(): return await consumer._bootstrapJobHandler( {"payload": {"connectionId": "c1", "authority": "msft"}}, - lambda *_: None, + lambda *_, **__: None, ) result = asyncio.run(_run()) @@ -152,13 +164,18 @@ def test_bootstrap_job_dispatches_msft_parts(monkeypatch): def test_bootstrap_job_dispatches_google_parts(monkeypatch): + _stubRagEnabledDs(monkeypatch, [ + {"id": "ds1", "sourceType": "googleDriveFolder"}, + {"id": "ds2", "sourceType": "gmailFolder"}, + ]) + calls = {"gd": 0, "gm": 0} - async def _fakeGd(connectionId, progressCb=None): + async def _fakeGd(connectionId, progressCb=None, dataSources=None): calls["gd"] += 1 return {"indexed": 7} - async def _fakeGm(connectionId, progressCb=None): + async def _fakeGm(connectionId, progressCb=None, dataSources=None): calls["gm"] += 1 return {"indexed": 11} @@ -180,7 +197,7 @@ def test_bootstrap_job_dispatches_google_parts(monkeypatch): async def _run(): return await consumer._bootstrapJobHandler( {"payload": {"connectionId": "c1", "authority": "google"}}, - lambda *_: None, + lambda *_, **__: None, ) result = asyncio.run(_run()) @@ -190,9 +207,13 @@ def test_bootstrap_job_dispatches_google_parts(monkeypatch): def test_bootstrap_job_dispatches_clickup_part(monkeypatch): + _stubRagEnabledDs(monkeypatch, [ + {"id": "ds1", "sourceType": "clickupList"}, + ]) + calls = {"cu": 0} - async def _fakeCu(connectionId, progressCb=None): + async def _fakeCu(connectionId, progressCb=None, dataSources=None): calls["cu"] += 1 return {"indexed": 4} @@ -207,7 +228,7 @@ def test_bootstrap_job_dispatches_clickup_part(monkeypatch): async def _run(): return await consumer._bootstrapJobHandler( {"payload": {"connectionId": "c1", "authority": "clickup"}}, - lambda *_: None, + lambda *_, **__: None, ) result = asyncio.run(_run()) diff --git a/tests/unit/services/test_ragLimits.py b/tests/unit/services/test_ragLimits.py new file mode 100644 index 00000000..bb336ed3 --- /dev/null +++ b/tests/unit/services/test_ragLimits.py @@ -0,0 +1,79 @@ +"""Unit tests for `_ragLimits` central helpers. + +Verifies: +- defaults are returned as fresh copies (no mutation leakage) +- getStoredOverrides returns ONLY explicit overrides (walker contract) +- getRagLimits merges defaults with overrides (API/cost-estimate contract) +- non-int values in stored settings are dropped, not silently coerced +""" + +from __future__ import annotations + +import unittest + +from modules.serviceCenter.services.serviceKnowledge import _ragLimits + + +class TestGetDefaults(unittest.TestCase): + def test_files_defaults_have_all_keys(self): + d = _ragLimits.getDefaults("files") + self.assertEqual(set(d.keys()), {"maxItems", "maxBytes", "maxFileSize", "maxDepth"}) + self.assertEqual(d["maxBytes"], 200 * 1024 * 1024) + + def test_clickup_defaults(self): + d = _ragLimits.getDefaults("clickup") + self.assertEqual(set(d.keys()), {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"}) + + def test_defaults_are_a_fresh_copy(self): + d1 = _ragLimits.getDefaults("files") + d1["maxBytes"] = 1 + d2 = _ragLimits.getDefaults("files") + self.assertEqual(d2["maxBytes"], 200 * 1024 * 1024) + + def test_unknown_kind_raises(self): + with self.assertRaises(ValueError): + _ragLimits.getDefaults("unknown") + + +class TestGetStoredOverrides(unittest.TestCase): + def test_no_settings_returns_empty_dict(self): + self.assertEqual(_ragLimits.getStoredOverrides({"id": "x", "settings": None}, "files"), {}) + + def test_only_explicit_overrides_returned(self): + ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}} + self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {"maxBytes": 999}) + + def test_unknown_keys_dropped(self): + ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999, "bogus": 1}}} + self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {"maxBytes": 999}) + + def test_non_int_dropped(self): + ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": "not-a-number"}}} + self.assertEqual(_ragLimits.getStoredOverrides(ds, "files"), {}) + + def test_none_or_garbage_settings_safe(self): + self.assertEqual(_ragLimits.getStoredOverrides(None, "files"), {}) + self.assertEqual(_ragLimits.getStoredOverrides({"id": "x", "settings": "garbage"}, "files"), {}) + + +class TestGetRagLimits(unittest.TestCase): + def test_no_settings_returns_defaults(self): + result = _ragLimits.getRagLimits({"id": "x", "settings": None}, "files") + self.assertEqual(result, _ragLimits.FILES_LIMITS_DEFAULT) + + def test_partial_override_merges_with_defaults(self): + ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}} + result = _ragLimits.getRagLimits(ds, "files") + self.assertEqual(result["maxBytes"], 999) + self.assertEqual(result["maxItems"], _ragLimits.FILES_LIMITS_DEFAULT["maxItems"]) + + def test_caller_can_distinguish_unset_from_set(self): + """Walker contract: an unset key MUST NOT appear in `getStoredOverrides`.""" + ds = {"id": "x", "settings": {"ragLimits": {"maxBytes": 999}}} + overrides = _ragLimits.getStoredOverrides(ds, "files") + self.assertIn("maxBytes", overrides) + self.assertNotIn("maxItems", overrides) + + +if __name__ == "__main__": + unittest.main()