fix ragLimits inheritance from connection root to child DataSources
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
a2c5360364
commit
51ac15e501
5 changed files with 78 additions and 24 deletions
|
|
@ -140,6 +140,21 @@ _SOURCE_TYPE_MAP = {
|
|||
}
|
||||
|
||||
|
||||
def _findConnectionRootRagLimits(allDs) -> dict:
|
||||
"""Return ragLimits from the connection root (path='/') if set, else {}."""
|
||||
for ds in allDs:
|
||||
dsDict = ds if isinstance(ds, dict) else {**ds.__dict__}
|
||||
path = dsDict.get("path", "")
|
||||
if path not in ("/", ""):
|
||||
continue
|
||||
settings = dsDict.get("settings")
|
||||
if isinstance(settings, dict):
|
||||
limits = settings.get("ragLimits")
|
||||
if isinstance(limits, dict) and limits:
|
||||
return dict(limits)
|
||||
return {}
|
||||
|
||||
|
||||
def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] = None):
|
||||
"""Load DataSource rows whose *effective* ragIndexEnabled is True.
|
||||
|
||||
|
|
@ -161,6 +176,8 @@ def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list]
|
|||
|
||||
rootIf = getRootInterface()
|
||||
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
|
||||
connectionRootLimits = _findConnectionRootRagLimits(allDs)
|
||||
|
||||
resolved = []
|
||||
for ds in allDs:
|
||||
effRagIndex = getEffectiveFlag(ds, "ragIndexEnabled", allDs)
|
||||
|
|
@ -170,6 +187,15 @@ def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list]
|
|||
dsCopy["neutralize"] = getEffectiveFlag(ds, "neutralize", allDs)
|
||||
dsCopy["scope"] = getEffectiveFlag(ds, "scope", allDs)
|
||||
dsCopy["ragIndexEnabled"] = True
|
||||
|
||||
if connectionRootLimits:
|
||||
dsSettings = dsCopy.get("settings") or {}
|
||||
if not isinstance(dsSettings, dict):
|
||||
dsSettings = {}
|
||||
ownLimits = dsSettings.get("ragLimits")
|
||||
if not isinstance(ownLimits, dict) or not ownLimits:
|
||||
dsCopy["settings"] = {**dsSettings, "ragLimits": connectionRootLimits}
|
||||
|
||||
resolved.append(dsCopy)
|
||||
if dataSourceIds:
|
||||
resolved = [ds for ds in resolved if ds.get("id") in dataSourceIds]
|
||||
|
|
|
|||
|
|
@ -234,6 +234,7 @@ async def bootstrapClickup(
|
|||
teams = (teamsResp or {}).get("teams") or []
|
||||
|
||||
cancelled = False
|
||||
effectiveLimits = limits
|
||||
for ds in dataSources:
|
||||
if result.indexed + result.skippedDuplicate >= limits.maxTasks:
|
||||
_recordLimitStop(result, "maxTasks", "dataSource", limits)
|
||||
|
|
@ -255,6 +256,7 @@ async def bootstrapClickup(
|
|||
neutralize=dsNeutralize,
|
||||
clickupScope=limits.clickupScope,
|
||||
)
|
||||
effectiveLimits = dsLimits
|
||||
|
||||
if len(teams) > dsLimits.maxWorkspaces:
|
||||
_recordLimitStop(result, "maxWorkspaces", "teams", dsLimits, hard=False)
|
||||
|
|
@ -283,7 +285,7 @@ async def bootstrapClickup(
|
|||
logger.error("clickup team %s walk failed: %s", teamId, exc, exc_info=True)
|
||||
result.errors.append(f"team({teamId}): {exc}")
|
||||
|
||||
finalResult = _finalizeResult(connectionId, result, startMs)
|
||||
finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits)
|
||||
if cancelled:
|
||||
finalResult["cancelled"] = True
|
||||
return finalResult
|
||||
|
|
@ -574,7 +576,12 @@ def _recordLimitStop(
|
|||
)
|
||||
|
||||
|
||||
def _finalizeResult(connectionId: str, result: ClickupBootstrapResult, startMs: float) -> Dict[str, Any]:
|
||||
def _finalizeResult(
|
||||
connectionId: str,
|
||||
result: ClickupBootstrapResult,
|
||||
startMs: float,
|
||||
effectiveLimits: Optional[ClickupBootstrapLimits] = None,
|
||||
) -> Dict[str, Any]:
|
||||
durationMs = int((time.time() - startMs) * 1000)
|
||||
logger.info(
|
||||
"ingestion.connection.bootstrap.done part=clickup connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d workspaces=%d lists=%d durationMs=%d stoppedAtLimit=%s",
|
||||
|
|
@ -608,9 +615,9 @@ def _finalizeResult(connectionId: str, result: ClickupBootstrapResult, startMs:
|
|||
"errors": result.errors[:20],
|
||||
"stoppedAtLimit": result.stoppedAtLimit,
|
||||
"limits": {
|
||||
"maxTasks": MAX_TASKS_DEFAULT,
|
||||
"maxWorkspaces": MAX_WORKSPACES_DEFAULT,
|
||||
"maxListsPerWorkspace": MAX_LISTS_PER_WORKSPACE_DEFAULT,
|
||||
"maxAgeDays": MAX_AGE_DAYS_DEFAULT,
|
||||
"maxTasks": effectiveLimits.maxTasks if effectiveLimits else MAX_TASKS_DEFAULT,
|
||||
"maxWorkspaces": effectiveLimits.maxWorkspaces if effectiveLimits else MAX_WORKSPACES_DEFAULT,
|
||||
"maxListsPerWorkspace": effectiveLimits.maxListsPerWorkspace if effectiveLimits else MAX_LISTS_PER_WORKSPACE_DEFAULT,
|
||||
"maxAgeDays": effectiveLimits.maxAgeDays if effectiveLimits else MAX_AGE_DAYS_DEFAULT,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -172,6 +172,7 @@ async def bootstrapGdrive(
|
|||
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
|
||||
|
||||
cancelled = False
|
||||
effectiveLimits = limits
|
||||
for ds in dataSources:
|
||||
if result.indexed + result.skippedDuplicate >= limits.maxItems:
|
||||
break
|
||||
|
|
@ -193,6 +194,7 @@ async def bootstrapGdrive(
|
|||
maxAgeDays=dsMaxAgeDays,
|
||||
neutralize=dsNeutralize,
|
||||
)
|
||||
effectiveLimits = dsLimits
|
||||
|
||||
try:
|
||||
await _walkFolder(
|
||||
|
|
@ -213,7 +215,7 @@ async def bootstrapGdrive(
|
|||
logger.error("gdrive walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True)
|
||||
result.errors.append(f"walk({dsPath}): {exc}")
|
||||
|
||||
finalResult = _finalizeResult(connectionId, result, startMs)
|
||||
finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits)
|
||||
if cancelled:
|
||||
finalResult["cancelled"] = True
|
||||
return finalResult
|
||||
|
|
@ -515,7 +517,12 @@ def _recordLimitStop(
|
|||
)
|
||||
|
||||
|
||||
def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: float) -> Dict[str, Any]:
|
||||
def _finalizeResult(
|
||||
connectionId: str,
|
||||
result: GdriveBootstrapResult,
|
||||
startMs: float,
|
||||
effectiveLimits: Optional[GdriveBootstrapLimits] = None,
|
||||
) -> Dict[str, Any]:
|
||||
durationMs = int((time.time() - startMs) * 1000)
|
||||
logger.info(
|
||||
"ingestion.connection.bootstrap.done part=gdrive connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d bytes=%d durationMs=%d stoppedAtLimit=%s",
|
||||
|
|
@ -547,9 +554,9 @@ def _finalizeResult(connectionId: str, result: GdriveBootstrapResult, startMs: f
|
|||
"errors": result.errors[:20],
|
||||
"stoppedAtLimit": result.stoppedAtLimit,
|
||||
"limits": {
|
||||
"maxItems": MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": MAX_DEPTH_DEFAULT,
|
||||
"maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -141,6 +141,7 @@ async def bootstrapKdrive(
|
|||
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
|
||||
|
||||
cancelled = False
|
||||
effectiveLimits = limits
|
||||
for ds in dataSources:
|
||||
if result.indexed + result.skippedDuplicate >= limits.maxItems:
|
||||
break
|
||||
|
|
@ -160,6 +161,7 @@ async def bootstrapKdrive(
|
|||
maxDepth=eff.get("maxDepth", limits.maxDepth),
|
||||
neutralize=dsNeutralize,
|
||||
)
|
||||
effectiveLimits = dsLimits
|
||||
|
||||
try:
|
||||
await _walkFolder(
|
||||
|
|
@ -180,7 +182,7 @@ async def bootstrapKdrive(
|
|||
logger.error("kdrive walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True)
|
||||
result.errors.append(f"walk({dsPath}): {exc}")
|
||||
|
||||
finalResult = _finalizeResult(connectionId, result, startMs)
|
||||
finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits)
|
||||
if cancelled:
|
||||
finalResult["cancelled"] = True
|
||||
return finalResult
|
||||
|
|
@ -460,7 +462,12 @@ def _recordLimitStop(
|
|||
)
|
||||
|
||||
|
||||
def _finalizeResult(connectionId: str, result: KdriveBootstrapResult, startMs: float) -> Dict[str, Any]:
|
||||
def _finalizeResult(
|
||||
connectionId: str,
|
||||
result: KdriveBootstrapResult,
|
||||
startMs: float,
|
||||
effectiveLimits: Optional[KdriveBootstrapLimits] = None,
|
||||
) -> Dict[str, Any]:
|
||||
durationMs = int((time.time() - startMs) * 1000)
|
||||
logger.info(
|
||||
"ingestion.connection.bootstrap.done part=kdrive connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d stoppedAtLimit=%s",
|
||||
|
|
@ -484,9 +491,9 @@ def _finalizeResult(connectionId: str, result: KdriveBootstrapResult, startMs: f
|
|||
"errors": result.errors[:20],
|
||||
"stoppedAtLimit": result.stoppedAtLimit,
|
||||
"limits": {
|
||||
"maxItems": MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": MAX_DEPTH_DEFAULT,
|
||||
"maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,6 +168,7 @@ async def bootstrapSharepoint(
|
|||
userId = str(getattr(connection, "userId", "") or "") if connection is not None else ""
|
||||
|
||||
cancelled = False
|
||||
effectiveLimits = limits
|
||||
for ds in dataSources:
|
||||
if result.indexed + result.skippedDuplicate >= limits.maxItems:
|
||||
break
|
||||
|
|
@ -188,6 +189,7 @@ async def bootstrapSharepoint(
|
|||
maxSites=limits.maxSites,
|
||||
neutralize=dsNeutralize,
|
||||
)
|
||||
effectiveLimits = dsLimits
|
||||
|
||||
try:
|
||||
await _walkFolder(
|
||||
|
|
@ -208,7 +210,7 @@ async def bootstrapSharepoint(
|
|||
logger.error("sharepoint walk failed for ds %s path %s: %s", dsId, dsPath, exc, exc_info=True)
|
||||
result.errors.append(f"walk({dsPath}): {exc}")
|
||||
|
||||
finalResult = _finalizeResult(connectionId, result, startMs)
|
||||
finalResult = _finalizeResult(connectionId, result, startMs, effectiveLimits)
|
||||
if cancelled:
|
||||
finalResult["cancelled"] = True
|
||||
return finalResult
|
||||
|
|
@ -505,7 +507,12 @@ def _recordLimitStop(
|
|||
)
|
||||
|
||||
|
||||
def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startMs: float) -> Dict[str, Any]:
|
||||
def _finalizeResult(
|
||||
connectionId: str,
|
||||
result: SharepointBootstrapResult,
|
||||
startMs: float,
|
||||
effectiveLimits: Optional[SharepointBootstrapLimits] = None,
|
||||
) -> Dict[str, Any]:
|
||||
durationMs = int((time.time() - startMs) * 1000)
|
||||
logger.info(
|
||||
"ingestion.connection.bootstrap.done part=sharepoint connectionId=%s indexed=%d skippedDup=%d skippedPolicy=%d failed=%d durationMs=%d stoppedAtLimit=%s",
|
||||
|
|
@ -535,9 +542,9 @@ def _finalizeResult(connectionId: str, result: SharepointBootstrapResult, startM
|
|||
"errors": result.errors[:20],
|
||||
"stoppedAtLimit": result.stoppedAtLimit,
|
||||
"limits": {
|
||||
"maxItems": MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": MAX_DEPTH_DEFAULT,
|
||||
"maxItems": effectiveLimits.maxItems if effectiveLimits else MAX_ITEMS_DEFAULT,
|
||||
"maxBytes": effectiveLimits.maxBytes if effectiveLimits else MAX_BYTES_DEFAULT,
|
||||
"maxFileSize": effectiveLimits.maxFileSize if effectiveLimits else MAX_FILE_SIZE_DEFAULT,
|
||||
"maxDepth": effectiveLimits.maxDepth if effectiveLimits else MAX_DEPTH_DEFAULT,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue