233 lines
9.8 KiB
Python
233 lines
9.8 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""DataSource auxiliary endpoints: settings (ragLimits) and cost estimate.
|
|
|
|
Flag toggles (neutralize / scope / ragIndexEnabled) have moved to the
|
|
generic UDB router (`POST /api/udb/node/{key}/flag/{flag}`); see
|
|
`modules/routes/routeUdb.py` and the wiki UDB reference page.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body
|
|
from modules.auth import limiter, getRequestContext, RequestContext
|
|
from modules.datamodels.datamodelDataSource import DataSource
|
|
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
|
|
from modules.datamodels.datamodelUam import UserConnection
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
routeApiMsg = apiRouteContext("routeDataSources")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None:
|
|
"""Forward-only sync: if a DataSource gets RAG-activated, ensure the parent
|
|
UserConnection.knowledgeIngestionEnabled is true.
|
|
|
|
Intentionally NOT bidirectional: disabling the last DataSource does NOT
|
|
auto-clear knowledgeIngestionEnabled, because the consent flag may have
|
|
been set explicitly via the Connections page / wizard even before any
|
|
DataSource exists. Only the master switch (`/knowledge-consent`) may
|
|
clear it.
|
|
"""
|
|
if not connectionId:
|
|
return
|
|
try:
|
|
currentConn = rootIf.db.getRecord(UserConnection, connectionId)
|
|
if not currentConn:
|
|
return
|
|
if bool(currentConn.get("knowledgeIngestionEnabled")):
|
|
return
|
|
rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True})
|
|
logger.info(
|
|
"Auto-enabled knowledgeIngestionEnabled on UserConnection %s "
|
|
"(triggered by first active DataSource).",
|
|
connectionId,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/datasources",
|
|
tags=["Data Sources"],
|
|
responses={
|
|
404: {"description": "Not found"},
|
|
400: {"description": "Bad request"},
|
|
401: {"description": "Unauthorized"},
|
|
403: {"description": "Forbidden"},
|
|
500: {"description": "Internal server error"},
|
|
},
|
|
)
|
|
|
|
def _findSourceRecord(db, sourceId: str):
|
|
"""Look up a source by ID, checking DataSource first, then FeatureDataSource."""
|
|
rec = db.getRecord(DataSource, sourceId)
|
|
if rec:
|
|
return rec, DataSource
|
|
rec = db.getRecord(FeatureDataSource, sourceId)
|
|
if rec:
|
|
return rec, FeatureDataSource
|
|
return None, None
|
|
|
|
|
|
_CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"}
|
|
_ALLOWED_RAG_LIMIT_KEYS = {
|
|
"files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"},
|
|
"clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"},
|
|
}
|
|
|
|
|
|
def _kindForSource(rec: Dict[str, Any], model) -> str:
|
|
"""Map a DataSource record to a RAG-limits kind ('files' or 'clickup').
|
|
|
|
FeatureDataSource (tables, not file walkers) reports as 'files' so the
|
|
same UI/limit shape works; the limits simply won't be consumed by any
|
|
walker today but are stored for forward-compat.
|
|
"""
|
|
if model is FeatureDataSource:
|
|
return "files"
|
|
sourceType = str(rec.get("sourceType") or "").strip()
|
|
return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files"
|
|
|
|
|
|
def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]:
|
|
"""Coerce an incoming ragLimits dict to {allowedKey: positive int}.
|
|
|
|
Unknown keys are silently dropped; non-positive or non-numeric values
|
|
are rejected with 400.
|
|
"""
|
|
if not isinstance(raw, dict):
|
|
raise HTTPException(status_code=400, detail="ragLimits must be an object")
|
|
allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set())
|
|
cleaned: Dict[str, int] = {}
|
|
for key, value in raw.items():
|
|
if key not in allowed:
|
|
continue
|
|
try:
|
|
intValue = int(value)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer")
|
|
if intValue <= 0:
|
|
raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0")
|
|
cleaned[key] = intValue
|
|
return cleaned
|
|
|
|
|
|
@router.patch("/{sourceId}/settings")
|
|
@limiter.limit("30/minute")
|
|
def _updateDataSourceSettings(
|
|
request: Request,
|
|
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
|
|
settings: Dict[str, Any] = Body(..., embed=True),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> Dict[str, Any]:
|
|
"""Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key).
|
|
|
|
Currently supports `ragLimits` only. Unknown top-level keys in the body are
|
|
rejected to avoid silently storing garbage that no consumer reads.
|
|
|
|
DataSource: owner-only (or sysadmin). For mandate/feature scopes the
|
|
mandateAdmin also passes. FeatureDataSource has no userId/scope; for
|
|
those we require a feature-admin role on the FDS's featureInstanceId.
|
|
"""
|
|
if not isinstance(settings, dict):
|
|
raise HTTPException(status_code=400, detail="settings must be an object")
|
|
unknown = set(settings.keys()) - {"ragLimits"}
|
|
if unknown:
|
|
raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}")
|
|
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
rootIf = getRootInterface()
|
|
rec, model = _findSourceRecord(rootIf.db, sourceId)
|
|
if not rec:
|
|
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
|
|
|
|
currentUserId = str(context.user.id)
|
|
if model is DataSource:
|
|
ownerId = str(rec.get("userId") or "")
|
|
if ownerId and ownerId != currentUserId and not context.isSysAdmin:
|
|
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag
|
|
connectionId = rec.get("connectionId", "")
|
|
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
|
|
scope = str(getEffectiveFlag(rec, "scope", allDs, mode="walk"))
|
|
isMandateAdmin = getattr(context, "isMandateAdmin", False)
|
|
if scope == "personal" or not isMandateAdmin:
|
|
raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings")
|
|
else:
|
|
from modules.serviceCenter.services.serviceKnowledge.udbNodes import _isFeatureAdmin
|
|
featureInstanceId = str(rec.get("featureInstanceId") or "")
|
|
if not (context.isSysAdmin or _isFeatureAdmin(rootIf, currentUserId, featureInstanceId)):
|
|
raise HTTPException(status_code=403, detail="Not allowed to modify this FeatureDataSource's settings")
|
|
|
|
kind = _kindForSource(rec, model)
|
|
|
|
currentSettings = rec.get("settings") or {}
|
|
if not isinstance(currentSettings, dict):
|
|
currentSettings = {}
|
|
newSettings = dict(currentSettings)
|
|
|
|
if "ragLimits" in settings:
|
|
cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"])
|
|
mergedLimits = dict(currentSettings.get("ragLimits") or {})
|
|
mergedLimits.update(cleanedLimits)
|
|
newSettings["ragLimits"] = mergedLimits
|
|
|
|
rootIf.db.recordModify(model, sourceId, {"settings": newSettings})
|
|
|
|
import json
|
|
from modules.shared.auditLogger import audit_logger
|
|
from modules.datamodels.datamodelAudit import AuditCategory
|
|
audit_logger.logEvent(
|
|
userId=currentUserId,
|
|
mandateId=context.mandateId,
|
|
category=AuditCategory.PERMISSION.value,
|
|
action="datasource_settings_changed",
|
|
details=json.dumps({
|
|
"sourceId": sourceId,
|
|
"model": model.__name__,
|
|
"oldSettings": currentSettings,
|
|
"newSettings": newSettings,
|
|
}),
|
|
)
|
|
logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId)
|
|
return {"sourceId": sourceId, "settings": newSettings, "updated": True}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Error updating datasource settings: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/{sourceId}/cost-estimate")
|
|
@limiter.limit("60/minute")
|
|
def _getDataSourceCostEstimate(
|
|
request: Request,
|
|
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> Dict[str, Any]:
|
|
"""Return an indicative full-sync cost estimate for the given DataSource.
|
|
|
|
Uses the current effective ragLimits (DataSource.settings.ragLimits with
|
|
fallback to centralized defaults) as the basis. Returns the same
|
|
`{estimatedTokens, estimatedChf, basis}` shape regardless of source kind.
|
|
"""
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.serviceCenter.services.serviceKnowledge import _ragLimits, _costEstimate
|
|
rootIf = getRootInterface()
|
|
rec, model = _findSourceRecord(rootIf.db, sourceId)
|
|
if not rec:
|
|
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
|
|
|
|
kind = _kindForSource(rec, model)
|
|
effective = _ragLimits.getRagLimits(rec, kind)
|
|
estimate = _costEstimate.estimateBootstrapCost(effective, kind=kind)
|
|
estimate["sourceId"] = sourceId
|
|
return estimate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Error computing cost estimate: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|