diff --git a/modules/routes/routeAdminDatabaseHealth.py b/modules/routes/routeAdminDatabaseHealth.py index 02343e83..c547cc76 100644 --- a/modules/routes/routeAdminDatabaseHealth.py +++ b/modules/routes/routeAdminDatabaseHealth.py @@ -7,6 +7,7 @@ and database migration (backup / restore). import json import logging +import os from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status @@ -570,13 +571,34 @@ async def postMigrationUploadImport( fileSizeMb = round(totalBytes / (1024 * 1024), 1) logger.info("SysAdmin migration upload-import: %s bytes on disk (%.1f MB)", totalBytes, fileSizeMb) - _pendingProcessing[token] = {"filePath": filePath, "tmpDir": tmpDir} + _writeTokenMeta(token, "processing", {"filePath": filePath, "tmpDir": tmpDir}) return {"token": token, "fileSizeMb": fileSizeMb} -_pendingProcessing: Dict[str, dict] = {} -_pendingImports: Dict[str, dict] = {} +def _tokenMetaPath(token: str, kind: str) -> str: + import tempfile + return os.path.join(tempfile.gettempdir(), f"poweron_{kind}_{token}.meta.json") + + +def _writeTokenMeta(token: str, kind: str, data: dict): + path = _tokenMetaPath(token, kind) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False) + + +def _readTokenMeta(token: str, kind: str, pop: bool = False) -> dict | None: + path = _tokenMetaPath(token, kind) + if not os.path.exists(path): + return None + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if pop: + try: + os.remove(path) + except OSError: + pass + return data @router.get("/migration/process-import-stream") @@ -598,7 +620,7 @@ def getProcessImportStream( import queue import threading - pending = _pendingProcessing.pop(token, None) + pending = _readTokenMeta(token, "processing", pop=True) if not pending: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired processing token.") @@ -643,10 +665,10 @@ def getProcessImportStream( except OSError: pass - _pendingImports[token] = { + _writeTokenMeta(token, "import", { "dbFiles": dbFiles, "protectedIds": protectedIds, - } + }) q.put({"phase": "done", "result": { "token": token, @@ -704,7 +726,7 @@ def postMigrationImportSingle( if mode not in ("replace", "merge"): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.") - pending = _pendingImports.get(token) + pending = _readTokenMeta(token, "import") if not pending: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token.") @@ -749,7 +771,7 @@ def postMigrationImportDone( import os token = body.get("token", "") - pending = _pendingImports.pop(token, None) + pending = _readTokenMeta(token, "import", pop=True) if pending: for dbEntry in pending.get("dbFiles", {}).values(): if isinstance(dbEntry, str): diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index be059eef..056a97cb 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -140,6 +140,21 @@ _SOURCE_TYPE_MAP = { } +def _findConnectionRootRagLimits(allDs) -> dict: + """Return ragLimits from the connection root (path='/') if set, else {}.""" + for ds in allDs: + dsDict = ds if isinstance(ds, dict) else {**ds.__dict__} + path = dsDict.get("path", "") + if path not in ("/", ""): + continue + settings = dsDict.get("settings") + if isinstance(settings, dict): + limits = settings.get("ragLimits") + if isinstance(limits, dict) and limits: + return dict(limits) + return {} + + def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] = None): """Load DataSource rows whose *effective* ragIndexEnabled is True. @@ -161,6 +176,8 @@ def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] rootIf = getRootInterface() allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) + connectionRootLimits = _findConnectionRootRagLimits(allDs) + resolved = [] for ds in allDs: effRagIndex = getEffectiveFlag(ds, "ragIndexEnabled", allDs) @@ -170,6 +187,15 @@ def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] dsCopy["neutralize"] = getEffectiveFlag(ds, "neutralize", allDs) dsCopy["scope"] = getEffectiveFlag(ds, "scope", allDs) dsCopy["ragIndexEnabled"] = True + + if connectionRootLimits: + dsSettings = dsCopy.get("settings") or {} + if not isinstance(dsSettings, dict): + dsSettings = {} + ownLimits = dsSettings.get("ragLimits") + if not isinstance(ownLimits, dict) or not ownLimits: + dsCopy["settings"] = {**dsSettings, "ragLimits": connectionRootLimits} + resolved.append(dsCopy) if dataSourceIds: resolved = [ds for ds in resolved if ds.get("id") in dataSourceIds] diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py index 28c24275..7e14e13e 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py @@ -234,6 +234,7 @@ async def bootstrapClickup( teams = (teamsResp or {}).get("teams") or [] cancelled = False + effectiveLimits = limits for ds in dataSources: if result.indexed + result.skippedDuplicate >= limits.maxTasks: _recordLimitStop(result, "maxTasks", "dataSource", limits) @@ -255,6 +256,7 @@ async def bootstrapClickup( neutralize=dsNeutralize, clickupScope=limits.clickupScope, ) + effectiveLimits = dsLimits if len(teams) > dsLimits.maxWorkspaces: _recordLimitStop(result, "maxWorkspaces", "teams", dsLimits, hard=False) @@ -283,7 +285,7 @@ async def bootstrapClickup( logger.error("clickup team %s walk failed: %s", teamId, exc, exc_info=True) result.errors.append(f"team({teamId}): {exc}") - finalResult = _finalizeResult(connectionId, result, startMs) + finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits) if cancelled: finalResult["cancelled"] = True return finalResult @@ -574,7 +576,12 @@ def _recordLimitStop( ) -def _finalizeResult(connectionId: str, result: ClickupBootstrapResult, startMs: float) -> Dict[str, Any]: +def _finalizeResult( + connectionId: str, + result: ClickupBootstrapResult, + startMs: float, + effectiveLimits: Optional[ClickupBootstrapLimits] = None, +) -> Dict[str, Any]: durationMs = int((time.time() - startMs) * 1000) logger.info( "ingestion.connection.bootstrap.done part=clickup connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d workspaces=%d lists=%d durationMs=%d stoppedAtLimit=%s", @@ -608,9 +615,9 @@ def _finalizeResult(connectionId: str, result: ClickupBootstrapResult, startMs: "errors": result.errors[:20], "stoppedAtLimit": result.stoppedAtLimit, "limits": { - "maxTasks": MAX_TASKS_DEFAULT, - "maxWorkspaces": MAX_WORKSPACES_DEFAULT, - "maxListsPerWorkspace": MAX_LISTS_PER_WORKSPACE_DEFAULT, - "maxAgeDays": MAX_AGE_DAYS_DEFAULT, + "maxTasks": effectiveLimits.maxTasks if effectiveLimits else MAX_TASKS_DEFAULT, + "maxWorkspaces": effectiveLimits.maxWorkspaces if effectiveLimits else MAX_WORKSPACES_DEFAULT, + "maxListsPerWorkspace": effectiveLimits.maxListsPerWorkspace if effectiveLimits else MAX_LISTS_PER_WORKSPACE_DEFAULT, + "maxAgeDays": effectiveLimits.maxAgeDays if effectiveLimits else MAX_AGE_DAYS_DEFAULT, }, } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py index 7600cce0..08f51ecd 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py @@ -172,6 +172,7 @@ async def bootstrapGdrive( userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" cancelled = False + effectiveLimits = limits for ds in dataSources: if result.indexed + result.skippedDuplicate >= limits.maxItems: break @@ -193,6 +194,7 @@ async def bootstrapGdrive( maxAgeDays=dsMaxAgeDays, neutralize=dsNeutralize, ) + effectiveLimits = dsLimits try: await _walkFolder( @@ -213,7 +215,7 @@ async def bootstrapGdrive( logger.error("gdrive walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True) result.errors.append(f"walk({dsPath}): {exc}") - finalResult = _finalizeResult(connectionId, result, startMs) + finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits) if cancelled: finalResult["cancelled"] = True return finalResult @@ -515,7 +517,12 @@ def _recordLimitStop( ) -def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: float) -> Dict[str, Any]: +def _finalizeResult( + connectionId: str, + result: GdriveBootstrapResult, + startMs: float, + effectiveLimits: Optional[GdriveBootstrapLimits] = None, +) -> Dict[str, Any]: durationMs = int((time.time() - startMs) * 1000) logger.info( "ingestion.connection.bootstrap.done part=gdrive connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d bytes=%d durationMs=%d stoppedAtLimit=%s", @@ -547,9 +554,9 @@ def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: f "errors": result.errors[:20], "stoppedAtLimit": result.stoppedAtLimit, "limits": { - "maxItems": MAX_ITEMS_DEFAULT, - "maxBytes": MAX_BYTES_DEFAULT, - "maxFileSize": MAX_FILE_SIZE_DEFAULT, - "maxDepth": MAX_DEPTH_DEFAULT, + "maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT, + "maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT, + "maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT, + "maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT, }, } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py index f95aafd1..1235ed18 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py @@ -141,6 +141,7 @@ async def bootstrapKdrive( userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" cancelled = False + effectiveLimits = limits for ds in dataSources: if result.indexed + result.skippedDuplicate >= limits.maxItems: break @@ -160,6 +161,7 @@ async def bootstrapKdrive( maxDepth=eff.get("maxDepth", limits.maxDepth), neutralize=dsNeutralize, ) + effectiveLimits = dsLimits try: await _walkFolder( @@ -180,7 +182,7 @@ async def bootstrapKdrive( logger.error("kdrive walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True) result.errors.append(f"walk({dsPath}): {exc}") - finalResult = _finalizeResult(connectionId, result, startMs) + finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits) if cancelled: finalResult["cancelled"] = True return finalResult @@ -460,7 +462,12 @@ def _recordLimitStop( ) -def _finalizeResult(connectionId: str, result: KdriveBootstrapResult, startMs: float) -> Dict[str, Any]: +def _finalizeResult( + connectionId: str, + result: KdriveBootstrapResult, + startMs: float, + effectiveLimits: Optional[KdriveBootstrapLimits] = None, +) -> Dict[str, Any]: durationMs = int((time.time() - startMs) * 1000) logger.info( "ingestion.connection.bootstrap.done part=kdrive connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d stoppedAtLimit=%s", @@ -484,9 +491,9 @@ def _finalizeResult(connectionId: str, result: KdriveBootstrapResult, startMs: f "errors": result.errors[:20], "stoppedAtLimit": result.stoppedAtLimit, "limits": { - "maxItems": MAX_ITEMS_DEFAULT, - "maxBytes": MAX_BYTES_DEFAULT, - "maxFileSize": MAX_FILE_SIZE_DEFAULT, - "maxDepth": MAX_DEPTH_DEFAULT, + "maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT, + "maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT, + "maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT, + "maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT, }, } diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py index 87c4c92a..6f69d171 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py @@ -168,6 +168,7 @@ async def bootstrapSharepoint( userId = str(getattr(connection, "userId", "") or "") if connection is not None else "" cancelled = False + effectiveLimits = limits for ds in dataSources: if result.indexed + result.skippedDuplicate >= limits.maxItems: break @@ -188,6 +189,7 @@ async def bootstrapSharepoint( maxSites=limits.maxSites, neutralize=dsNeutralize, ) + effectiveLimits = dsLimits try: await _walkFolder( @@ -208,7 +210,7 @@ async def bootstrapSharepoint( logger.error("sharepoint walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True) result.errors.append(f"walk({dsPath}): {exc}") - finalResult = _finalizeResult(connectionId, result, startMs) + finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits) if cancelled: finalResult["cancelled"] = True return finalResult @@ -505,7 +507,12 @@ def _recordLimitStop( ) -def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startMs: float) -> Dict[str, Any]: +def _finalizeResult( + connectionId: str, + result: SharepointBootstrapResult, + startMs: float, + effectiveLimits: Optional[SharepointBootstrapLimits] = None, +) -> Dict[str, Any]: durationMs = int((time.time() - startMs) * 1000) logger.info( "ingestion.connection.bootstrap.done part=sharepoint connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d stoppedAtLimit=%s", @@ -535,9 +542,9 @@ def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startM "errors": result.errors[:20], "stoppedAtLimit": result.stoppedAtLimit, "limits": { - "maxItems": MAX_ITEMS_DEFAULT, - "maxBytes": MAX_BYTES_DEFAULT, - "maxFileSize": MAX_FILE_SIZE_DEFAULT, - "maxDepth": MAX_DEPTH_DEFAULT, + "maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT, + "maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT, + "maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT, + "maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT, }, }