# Copyright (c) 2025 Patrick Motsch # All rights reserved. """PATCH endpoints for DataSource and FeatureDataSource scope/neutralize/rag-index tagging.""" import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body from modules.auth import limiter, getRequestContext, RequestContext from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource from modules.datamodels.datamodelUam import UserConnection from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeDataSources") logger = logging.getLogger(__name__) def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None: """Forward-only sync: if a DataSource gets RAG-activated, ensure the parent UserConnection.knowledgeIngestionEnabled is true. Intentionally NOT bidirectional: disabling the last DataSource does NOT auto-clear knowledgeIngestionEnabled, because the consent flag may have been set explicitly via the Connections page / wizard even before any DataSource exists. Only the master switch (`/knowledge-consent`) may clear it. """ if not connectionId: return try: currentConn = rootIf.db.getRecord(UserConnection, connectionId) if not currentConn: return if bool(currentConn.get("knowledgeIngestionEnabled")): return rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True}) logger.info( "Auto-enabled knowledgeIngestionEnabled on UserConnection %s " "(triggered by first active DataSource).", connectionId, ) except Exception as e: logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e) def _computeOwnEffective(rootIf, rec, model, sourceId: str, flag: str) -> Any: """Re-load the record after modification and compute its aggregate effective value.""" from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( getEffectiveFlag, getEffectiveFlagFds, ) freshRec = rootIf.db.getRecord(model, sourceId) if not freshRec: return None if model is DataSource: connectionId = freshRec.get("connectionId", "") allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) return getEffectiveFlag(freshRec, flag, allDs, mode="aggregate") else: wsId = freshRec.get("workspaceInstanceId", "") allFds = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"workspaceInstanceId": wsId}) return getEffectiveFlagFds(freshRec, flag, allFds, mode="aggregate") def _computeAncestorEffectives(rootIf, rec, model, flag: str) -> List[Dict[str, Any]]: """Compute the aggregate effective value for all ancestors of `rec`.""" from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( collectAncestorChain, collectAncestorChainFds, getEffectiveFlag, getEffectiveFlagFds, ) effectiveKey = f"effective{flag[0].upper()}{flag[1:]}" if model is DataSource: connectionId = rec.get("connectionId", "") allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) ancestors = collectAncestorChain(rec, allDs) return [ {"id": a.get("id") or getattr(a, "id", ""), effectiveKey: getEffectiveFlag(a, flag, allDs, mode="aggregate")} for a in ancestors ] else: wsId = rec.get("workspaceInstanceId", "") allFds = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"workspaceInstanceId": wsId}) ancestors = collectAncestorChainFds(rec, allFds) return [ {"id": a.get("id") or getattr(a, "id", ""), effectiveKey: getEffectiveFlagFds(a, flag, allFds, mode="aggregate")} for a in ancestors ] router = APIRouter( prefix="/api/datasources", tags=["Data Sources"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"}, }, ) _VALID_SCOPES = {"personal", "featureInstance", "mandate", "global"} def _findSourceRecord(db, sourceId: str): """Look up a source by ID, checking DataSource first, then FeatureDataSource.""" rec = db.getRecord(DataSource, sourceId) if rec: return rec, DataSource rec = db.getRecord(FeatureDataSource, sourceId) if rec: return rec, FeatureDataSource return None, None @router.patch("/{sourceId}/scope") @limiter.limit("30/minute") def _updateDataSourceScope( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), scope: Optional[str] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Update the scope of a DataSource. Cascade-resets explicit descendants. `scope=None` resets this node to inherit (no cascade). Global scope requires sysAdmin. """ if scope is not None: if scope not in _VALID_SCOPES: raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}") if scope == "global" and not context.isSysAdmin: raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope")) try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( cascadeResetDescendants, cascadeResetDescendantsFds, getEffectiveFlag, getEffectiveFlagFds, collectAncestorChain, collectAncestorChainFds, ) rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") # 1. Cascade reset descendants bottom-up (before modifying master) resetIds: List[str] = [] if scope is not None: if model is DataSource: resetIds = cascadeResetDescendants(rootIf, rec, "scope") else: resetIds = cascadeResetDescendantsFds(rootIf, rec, "scope") # 2. Set master value last (crash-safe) rootIf.db.recordModify(model, sourceId, {"scope": scope}) # 3. Compute effective + ancestor chain for response updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "scope") effectiveScope = _computeOwnEffective(rootIf, rec, model, sourceId, "scope") logger.info( "Updated scope=%s for %s %s (cascade-reset %d descendants)", scope, model.__name__, sourceId, len(resetIds), ) return { "sourceId": sourceId, "scope": scope, "effectiveScope": effectiveScope, "resetDescendantIds": resetIds, "updatedAncestors": updatedAncestors, } except HTTPException: raise except Exception as e: logger.error("Error updating datasource scope: %s", e) raise HTTPException(status_code=500, detail=str(e)) @router.patch("/{sourceId}/neutralize") @limiter.limit("30/minute") def _updateDataSourceNeutralize( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), neutralize: Optional[bool] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Set neutralize flag on a DataSource. Cascade-resets explicit descendants. `neutralize=None` resets this node to inherit (no cascade). """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( cascadeResetDescendants, cascadeResetDescendantsFds, ) rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") # 1. Cascade reset descendants bottom-up (before modifying master) resetIds: List[str] = [] if neutralize is not None: if model is DataSource: resetIds = cascadeResetDescendants(rootIf, rec, "neutralize") else: resetIds = cascadeResetDescendantsFds(rootIf, rec, "neutralize") # 2. Set master value last (crash-safe) rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize}) # 3. Compute effective + ancestor chain for response updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "neutralize") effectiveNeutralize = _computeOwnEffective(rootIf, rec, model, sourceId, "neutralize") logger.info( "Updated neutralize=%s for %s %s (cascade-reset %d descendants)", neutralize, model.__name__, sourceId, len(resetIds), ) return { "sourceId": sourceId, "neutralize": neutralize, "effectiveNeutralize": effectiveNeutralize, "resetDescendantIds": resetIds, "updatedAncestors": updatedAncestors, } except HTTPException: raise except Exception as e: logger.error("Error updating datasource neutralize: %s", e) raise HTTPException(status_code=500, detail=str(e)) @router.patch("/{sourceId}/neutralize-fields") @limiter.limit("30/minute") def _updateNeutralizeFields( request: Request, sourceId: str = Path(..., description="ID of the FeatureDataSource"), neutralizeFields: List[str] = Body(..., embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Update the list of field names to neutralize on a FeatureDataSource.""" try: from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() rec = rootIf.db.getRecord(FeatureDataSource, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"FeatureDataSource {sourceId} not found") cleanFields = [f for f in neutralizeFields if f and isinstance(f, str)] if neutralizeFields else [] rootIf.db.recordModify(FeatureDataSource, sourceId, { "neutralizeFields": cleanFields if cleanFields else None, }) logger.info("Updated neutralizeFields=%s for FeatureDataSource %s", cleanFields, sourceId) return {"sourceId": sourceId, "neutralizeFields": cleanFields, "updated": True} except HTTPException: raise except Exception as e: logger.error("Error updating neutralizeFields: %s", e) raise HTTPException(status_code=500, detail=str(e)) @router.patch("/{sourceId}/rag-index") @limiter.limit("30/minute") async def _updateDataSourceRagIndex( request: Request, sourceId: str = Path(..., description="ID of the DataSource"), ragIndexEnabled: Optional[bool] = Body(None, embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Set RAG indexing flag on a DataSource. Cascade-resets explicit descendants. `ragIndexEnabled=None` resets this node to inherit (no cascade, no purge, no bootstrap — the node simply follows its ancestor chain afterwards). `True` enqueues a mini-bootstrap. `False` synchronously purges chunks. Must be `async def` so `await startJob(...)` registers `_runJob` in the main event loop. """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( cascadeResetDescendants, cascadeResetDescendantsFds, ) rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") # 1. Cascade reset descendants bottom-up (before modifying master) resetIds: List[str] = [] if ragIndexEnabled is not None: if model is DataSource: resetIds = cascadeResetDescendants(rootIf, rec, "ragIndexEnabled") else: resetIds = cascadeResetDescendantsFds(rootIf, rec, "ragIndexEnabled") # 2. Set master value last (crash-safe) rootIf.db.recordModify(model, sourceId, {"ragIndexEnabled": ragIndexEnabled}) logger.info( "Updated ragIndexEnabled=%s for %s %s (cascade-reset %d descendants)", ragIndexEnabled, model.__name__, sourceId, len(resetIds), ) # Bootstrap / purge only for personal DataSource (file/folder-based RAG). # FDS RAG is handled by the feature pipeline; the flag alone is enough. if model is DataSource: connectionId = rec.get("connectionId") or rec.get("connection_id") or "" if ragIndexEnabled is True: _ensureConnectionKnowledgeFlag(rootIf, connectionId) from modules.serviceCenter.services.serviceBackgroundJobs import startJob conn = rootIf.getUserConnectionById(connectionId) if connectionId else None authority = "" if conn: authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority or "") await startJob( "connection.bootstrap", {"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": [sourceId]}, triggeredBy=str(context.user.id), ) elif ragIndexEnabled is False: from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface purgeResult = getKnowledgeInterface(None).deleteFileContentIndexByDataSource(sourceId) logger.info("Purged %d index rows / %d chunks for DataSource %s", purgeResult.get("indexRows", 0), purgeResult.get("chunks", 0), sourceId) import json from modules.shared.auditLogger import audit_logger from modules.datamodels.datamodelAudit import AuditCategory audit_logger.logEvent( userId=str(context.user.id), mandateId=context.mandateId, category=AuditCategory.PERMISSION.value, action="rag_index_toggled", details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "resetDescendants": len(resetIds), "model": model.__name__}), ) # 3. Compute effective + ancestors for response updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "ragIndexEnabled") effectiveRag = _computeOwnEffective(rootIf, rec, model, sourceId, "ragIndexEnabled") return { "sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "effectiveRagIndexEnabled": effectiveRag, "resetDescendantIds": resetIds, "updatedAncestors": updatedAncestors, } except HTTPException: raise except Exception as e: logger.error("Error updating datasource ragIndexEnabled: %s", e) raise HTTPException(status_code=500, detail=str(e)) _CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"} _ALLOWED_RAG_LIMIT_KEYS = { "files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"}, "clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"}, } def _kindForSource(rec: Dict[str, Any], model) -> str: """Map a DataSource record to a RAG-limits kind ('files' or 'clickup'). FeatureDataSource (tables, not file walkers) reports as 'files' so the same UI/limit shape works; the limits simply won't be consumed by any walker today but are stored for forward-compat. """ if model is FeatureDataSource: return "files" sourceType = str(rec.get("sourceType") or "").strip() return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files" def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]: """Coerce an incoming ragLimits dict to {allowedKey: positive int}. Unknown keys are silently dropped; non-positive or non-numeric values are rejected with 400. """ if not isinstance(raw, dict): raise HTTPException(status_code=400, detail="ragLimits must be an object") allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set()) cleaned: Dict[str, int] = {} for key, value in raw.items(): if key not in allowed: continue try: intValue = int(value) except (TypeError, ValueError): raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer") if intValue <= 0: raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0") cleaned[key] = intValue return cleaned @router.patch("/{sourceId}/settings") @limiter.limit("30/minute") def _updateDataSourceSettings( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), settings: Dict[str, Any] = Body(..., embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key). Currently supports `ragLimits` only. Unknown top-level keys in the body are rejected to avoid silently storing garbage that no consumer reads. Owner-only for personal DataSources; mandate/feature scopes additionally accept the mandate or workspace admins of that scope. """ if not isinstance(settings, dict): raise HTTPException(status_code=400, detail="settings must be an object") unknown = set(settings.keys()) - {"ragLimits"} if unknown: raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}") try: from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") ownerId = str(rec.get("userId") or "") currentUserId = str(context.user.id) if ownerId and ownerId != currentUserId and not context.isSysAdmin: from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag if model is DataSource: connectionId = rec.get("connectionId", "") allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) scope = str(getEffectiveFlag(rec, "scope", allDs, mode="walk")) else: from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource as FDS from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds wsId = rec.get("workspaceInstanceId", "") allFds = rootIf.db.getRecordset(FDS, recordFilter={"workspaceInstanceId": wsId}) scope = str(getEffectiveFlagFds(rec, "scope", allFds, mode="walk")) isMandateAdmin = getattr(context, "isMandateAdmin", False) if scope == "personal" or not isMandateAdmin: raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings") kind = _kindForSource(rec, model) currentSettings = rec.get("settings") or {} if not isinstance(currentSettings, dict): currentSettings = {} newSettings = dict(currentSettings) if "ragLimits" in settings: cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"]) mergedLimits = dict(currentSettings.get("ragLimits") or {}) mergedLimits.update(cleanedLimits) newSettings["ragLimits"] = mergedLimits rootIf.db.recordModify(model, sourceId, {"settings": newSettings}) import json from modules.shared.auditLogger import audit_logger from modules.datamodels.datamodelAudit import AuditCategory audit_logger.logEvent( userId=currentUserId, mandateId=context.mandateId, category=AuditCategory.PERMISSION.value, action="datasource_settings_changed", details=json.dumps({ "sourceId": sourceId, "model": model.__name__, "oldSettings": currentSettings, "newSettings": newSettings, }), ) logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId) return {"sourceId": sourceId, "settings": newSettings, "updated": True} except HTTPException: raise except Exception as e: logger.error("Error updating datasource settings: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/{sourceId}/cost-estimate") @limiter.limit("60/minute") def _getDataSourceCostEstimate( request: Request, sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Return an indicative full-sync cost estimate for the given DataSource. Uses the current effective ragLimits (DataSource.settings.ragLimits with fallback to centralized defaults) as the basis. Returns the same `{estimatedTokens, estimatedUsd, basis}` shape regardless of source kind. """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.serviceCenter.services.serviceKnowledge import _ragLimits, _costEstimate rootIf = getRootInterface() rec, model = _findSourceRecord(rootIf.db, sourceId) if not rec: raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found") kind = _kindForSource(rec, model) effective = _ragLimits.getRagLimits(rec, kind) estimate = _costEstimate.estimateBootstrapCost(effective, kind=kind) estimate["sourceId"] = sourceId return estimate except HTTPException: raise except Exception as e: logger.error("Error computing cost estimate: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e))