# Copyright (c) 2026 PowerOn AG # All rights reserved. """DataSource auxiliary endpoints: settings (ragLimits) and cost estimate. Flag toggles (neutralize / ragIndexEnabled) have moved to the generic UDB router (`POST /api/udb/node/{key}/flag/{flag}`); see `modules/routes/routeUdb.py` and the wiki UDB reference page. """ import json import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body from modules.auth import limiter, getRequestContext, RequestContext from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelFeatures import FeatureDataSource from modules.datamodels.datamodelUam import UserConnection from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeDataSources") logger = logging.getLogger(__name__) def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None: """Forward-only sync: if a DataSource gets RAG-activated, ensure the parent UserConnection.knowledgeIngestionEnabled is true. Intentionally NOT bidirectional: disabling the last DataSource does NOT auto-clear knowledgeIngestionEnabled, because the consent flag may have been set explicitly via the Connections page / wizard even before any DataSource exists. Only the master switch (`/knowledge-consent`) may clear it. """ if not connectionId: return try: currentConn = rootIf.db.getRecord(UserConnection, connectionId) if not currentConn: return if bool(currentConn.get("knowledgeIngestionEnabled")): return rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True}) logger.info( "Auto-enabled knowledgeIngestionEnabled on UserConnection %s " "(triggered by first active DataSource).", connectionId, ) except Exception as e: logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e) router = APIRouter( prefix="/api/datasources", tags=["Data Sources"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"}, }, ) def _findSourceRecord(db, sourceId: str): """Look up a source by ID, checking DataSource first, then FeatureDataSource.""" rec = db.getRecord(DataSource, sourceId) if rec: return rec, DataSource rec = db.getRecord(FeatureDataSource, sourceId) if rec: return rec, FeatureDataSource return None, None _CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"} _ALLOWED_RAG_LIMIT_KEYS = { "files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"}, "clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"}, } def _kindForSource(rec: Dict[str, Any], model) -> str: """Map a DataSource record to a RAG-limits kind ('files' or 'clickup'). FeatureDataSource (tables, not file walkers) reports as 'files' so the same UI/limit shape works; the limits simply won't be consumed by any walker today but are stored for forward-compat. """ if model is FeatureDataSource: return "files" sourceType = str(rec.get("sourceType") or "").strip() return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files" def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]: """Coerce an incoming ragLimits dict to {allowedKey: positive int}. Unknown keys are silently dropped; non-positive or non-numeric values are rejected with 400. """ if not isinstance(raw, dict): raise HTTPException(status_code=400, detail="ragLimits must be an object") allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set()) cleaned: Dict[str, int] = {} for key, value in raw.items(): if key not in allowed: continue try: intValue = int(value) except (TypeError, ValueError): raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer") if intValue <= 0: raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0") cleaned[key] = intValue return cleaned @router.patch("/{sourceId}/settings") @limiter.limit("30/minute") def _updateDataSourceSettings( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), settings: Dict[str, Any] = Body(..., embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key). Currently supports `ragLimits` only. Unknown top-level keys in the body are rejected to avoid silently storing garbage that no consumer reads. DataSource: owner-only (or sysadmin). FeatureDataSource requires a feature-admin role on the FDS's featureInstanceId. """ if not isinstance(settings, dict): raise HTTPException(status_code=400, detail="settings must be an object") unknown = set(settings.keys()) - {"ragLimits"} if unknown: raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}") try: from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") currentUserId = str(context.user.id) if model is DataSource: ownerId = str(rec.get("userId") or "") if ownerId and ownerId != currentUserId and not context.isSysAdmin: raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings") else: from modules.serviceCenter.services.serviceKnowledge.udbNodes import isFeatureAdmin featureInstanceId = str(rec.get("featureInstanceId") or "") if not (context.isSysAdmin or isFeatureAdmin(rootIf, currentUserId, featureInstanceId)): raise HTTPException(status_code=403, detail="Not allowed to modify this FeatureDataSource's settings") kind = _kindForSource(rec, model) currentSettings = rec.get("settings") or {} if not isinstance(currentSettings, dict): currentSettings = {} newSettings = dict(currentSettings) if "ragLimits" in settings: cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"]) mergedLimits = dict(currentSettings.get("ragLimits") or {}) mergedLimits.update(cleanedLimits) newSettings["ragLimits"] = mergedLimits rootIf.db.recordModify(model, sourceId, {"settings": newSettings}) from modules.dbHelpers.auditLogger import audit_logger from modules.datamodels.datamodelAudit import AuditCategory audit_logger.logEvent( userId=currentUserId, mandateId=context.mandateId, category=AuditCategory.PERMISSION.value, action="datasource_settings_changed", details=json.dumps({ "sourceId": sourceId, "model": model.__name__, "oldSettings": currentSettings, "newSettings": newSettings, }), ) logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId) return {"sourceId": sourceId, "settings": newSettings, "updated": True} except HTTPException: raise except Exception as e: logger.error("Error updating datasource settings: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/{sourceId}/cost-estimate") @limiter.limit("60/minute") def _getDataSourceCostEstimate( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Return an indicative full-sync cost estimate for the given DataSource. Uses the current effective ragLimits (DataSource.settings.ragLimits with fallback to centralized defaults) as the basis. Returns the same `{estimatedTokens, estimatedChf, basis}` shape regardless of source kind. """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.serviceCenter.services.serviceKnowledge import ragLimits, costEstimate rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") kind = _kindForSource(rec, model) effective = ragLimits.getRagLimits(rec, kind) estimate = costEstimate.estimateBootstrapCost(effective, kind=kind) estimate["sourceId"] = sourceId return estimate except HTTPException: raise except Exception as e: logger.error("Error computing cost estimate: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e))