platform-core/modules/routes/routeDataSources.py
2026-05-19 16:48:01 +02:00

518 lines
22 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""PATCH endpoints for DataSource and FeatureDataSource scope/neutralize/rag-index tagging."""
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelDataSource import DataSource
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
from modules.datamodels.datamodelUam import UserConnection
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeDataSources")
logger = logging.getLogger(__name__)
def _ensureConnectionKnowledgeFlag(rootIf, connectionId: str) -> None:
"""Forward-only sync: if a DataSource gets RAG-activated, ensure the parent
UserConnection.knowledgeIngestionEnabled is true.
Intentionally NOT bidirectional: disabling the last DataSource does NOT
auto-clear knowledgeIngestionEnabled, because the consent flag may have
been set explicitly via the Connections page / wizard even before any
DataSource exists. Only the master switch (`/knowledge-consent`) may
clear it.
"""
if not connectionId:
return
try:
currentConn = rootIf.db.getRecord(UserConnection, connectionId)
if not currentConn:
return
if bool(currentConn.get("knowledgeIngestionEnabled")):
return
rootIf.db.recordModify(UserConnection, connectionId, {"knowledgeIngestionEnabled": True})
logger.info(
"Auto-enabled knowledgeIngestionEnabled on UserConnection %s "
"(triggered by first active DataSource).",
connectionId,
)
except Exception as e:
logger.warning("Could not auto-enable knowledgeIngestionEnabled for connection %s: %s", connectionId, e)
def _computeOwnEffective(rootIf, rec, model, sourceId: str, flag: str) -> Any:
"""Re-load the record after modification and compute its aggregate effective value."""
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
getEffectiveFlag, getEffectiveFlagFds,
)
freshRec = rootIf.db.getRecord(model, sourceId)
if not freshRec:
return None
if model is DataSource:
connectionId = freshRec.get("connectionId", "")
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
return getEffectiveFlag(freshRec, flag, allDs, mode="aggregate")
else:
wsId = freshRec.get("workspaceInstanceId", "")
allFds = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"workspaceInstanceId": wsId})
return getEffectiveFlagFds(freshRec, flag, allFds, mode="aggregate")
def _computeAncestorEffectives(rootIf, rec, model, flag: str) -> List[Dict[str, Any]]:
"""Compute the aggregate effective value for all ancestors of `rec`."""
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
collectAncestorChain, collectAncestorChainFds,
getEffectiveFlag, getEffectiveFlagFds,
)
effectiveKey = f"effective{flag[0].upper()}{flag[1:]}"
if model is DataSource:
connectionId = rec.get("connectionId", "")
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
ancestors = collectAncestorChain(rec, allDs)
return [
{"id": a.get("id") or getattr(a, "id", ""), effectiveKey: getEffectiveFlag(a, flag, allDs, mode="aggregate")}
for a in ancestors
]
else:
wsId = rec.get("workspaceInstanceId", "")
allFds = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"workspaceInstanceId": wsId})
ancestors = collectAncestorChainFds(rec, allFds)
return [
{"id": a.get("id") or getattr(a, "id", ""), effectiveKey: getEffectiveFlagFds(a, flag, allFds, mode="aggregate")}
for a in ancestors
]
router = APIRouter(
prefix="/api/datasources",
tags=["Data Sources"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"},
},
)
_VALID_SCOPES = {"personal", "featureInstance", "mandate", "global"}
def _findSourceRecord(db, sourceId: str):
"""Look up a source by ID, checking DataSource first, then FeatureDataSource."""
rec = db.getRecord(DataSource, sourceId)
if rec:
return rec, DataSource
rec = db.getRecord(FeatureDataSource, sourceId)
if rec:
return rec, FeatureDataSource
return None, None
@router.patch("/{sourceId}/scope")
@limiter.limit("30/minute")
def _updateDataSourceScope(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
scope: Optional[str] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the scope of a DataSource. Cascade-resets explicit descendants.
`scope=None` resets this node to inherit (no cascade). Global scope
requires sysAdmin.
"""
if scope is not None:
if scope not in _VALID_SCOPES:
raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}")
if scope == "global" and not context.isSysAdmin:
raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope"))
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
cascadeResetDescendants, cascadeResetDescendantsFds,
getEffectiveFlag, getEffectiveFlagFds,
collectAncestorChain, collectAncestorChainFds,
)
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
# 1. Cascade reset descendants bottom-up (before modifying master)
resetIds: List[str] = []
if scope is not None:
if model is DataSource:
resetIds = cascadeResetDescendants(rootIf, rec, "scope")
else:
resetIds = cascadeResetDescendantsFds(rootIf, rec, "scope")
# 2. Set master value last (crash-safe)
rootIf.db.recordModify(model, sourceId, {"scope": scope})
# 3. Compute effective + ancestor chain for response
updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "scope")
effectiveScope = _computeOwnEffective(rootIf, rec, model, sourceId, "scope")
logger.info(
"Updated scope=%s for %s %s (cascade-reset %d descendants)",
scope, model.__name__, sourceId, len(resetIds),
)
return {
"sourceId": sourceId,
"scope": scope,
"effectiveScope": effectiveScope,
"resetDescendantIds": resetIds,
"updatedAncestors": updatedAncestors,
}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource scope: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/neutralize")
@limiter.limit("30/minute")
def _updateDataSourceNeutralize(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
neutralize: Optional[bool] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Set neutralize flag on a DataSource. Cascade-resets explicit descendants.
`neutralize=None` resets this node to inherit (no cascade).
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
cascadeResetDescendants, cascadeResetDescendantsFds,
)
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
# 1. Cascade reset descendants bottom-up (before modifying master)
resetIds: List[str] = []
if neutralize is not None:
if model is DataSource:
resetIds = cascadeResetDescendants(rootIf, rec, "neutralize")
else:
resetIds = cascadeResetDescendantsFds(rootIf, rec, "neutralize")
# 2. Set master value last (crash-safe)
rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize})
# 3. Compute effective + ancestor chain for response
updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "neutralize")
effectiveNeutralize = _computeOwnEffective(rootIf, rec, model, sourceId, "neutralize")
logger.info(
"Updated neutralize=%s for %s %s (cascade-reset %d descendants)",
neutralize, model.__name__, sourceId, len(resetIds),
)
return {
"sourceId": sourceId,
"neutralize": neutralize,
"effectiveNeutralize": effectiveNeutralize,
"resetDescendantIds": resetIds,
"updatedAncestors": updatedAncestors,
}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource neutralize: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/neutralize-fields")
@limiter.limit("30/minute")
def _updateNeutralizeFields(
request: Request,
sourceId: str = Path(..., description="ID of the FeatureDataSource"),
neutralizeFields: List[str] = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the list of field names to neutralize on a FeatureDataSource."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec = rootIf.db.getRecord(FeatureDataSource, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"FeatureDataSource {sourceId} not found")
cleanFields = [f for f in neutralizeFields if f and isinstance(f, str)] if neutralizeFields else []
rootIf.db.recordModify(FeatureDataSource, sourceId, {
"neutralizeFields": cleanFields if cleanFields else None,
})
logger.info("Updated neutralizeFields=%s for FeatureDataSource %s", cleanFields, sourceId)
return {"sourceId": sourceId, "neutralizeFields": cleanFields, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating neutralizeFields: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/rag-index")
@limiter.limit("30/minute")
async def _updateDataSourceRagIndex(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource"),
ragIndexEnabled: Optional[bool] = Body(None, embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Set RAG indexing flag on a DataSource. Cascade-resets explicit descendants.
`ragIndexEnabled=None` resets this node to inherit (no cascade, no purge,
no bootstrap — the node simply follows its ancestor chain afterwards).
`True` enqueues a mini-bootstrap. `False` synchronously purges chunks.
Must be `async def` so `await startJob(...)` registers `_runJob` in the
main event loop.
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import (
cascadeResetDescendants, cascadeResetDescendantsFds,
)
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
# 1. Cascade reset descendants bottom-up (before modifying master)
resetIds: List[str] = []
if ragIndexEnabled is not None:
if model is DataSource:
resetIds = cascadeResetDescendants(rootIf, rec, "ragIndexEnabled")
else:
resetIds = cascadeResetDescendantsFds(rootIf, rec, "ragIndexEnabled")
# 2. Set master value last (crash-safe)
rootIf.db.recordModify(model, sourceId, {"ragIndexEnabled": ragIndexEnabled})
logger.info(
"Updated ragIndexEnabled=%s for %s %s (cascade-reset %d descendants)",
ragIndexEnabled, model.__name__, sourceId, len(resetIds),
)
# Bootstrap / purge only for personal DataSource (file/folder-based RAG).
# FDS RAG is handled by the feature pipeline; the flag alone is enough.
if model is DataSource:
connectionId = rec.get("connectionId") or rec.get("connection_id") or ""
if ragIndexEnabled is True:
_ensureConnectionKnowledgeFlag(rootIf, connectionId)
from modules.serviceCenter.services.serviceBackgroundJobs import startJob
conn = rootIf.getUserConnectionById(connectionId) if connectionId else None
authority = ""
if conn:
authority = conn.authority.value if hasattr(conn.authority, "value") else str(conn.authority or "")
await startJob(
"connection.bootstrap",
{"connectionId": connectionId, "authority": authority.lower(), "dataSourceIds": [sourceId]},
triggeredBy=str(context.user.id),
)
elif ragIndexEnabled is False:
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
purgeResult = getKnowledgeInterface(None).deleteFileContentIndexByDataSource(sourceId)
logger.info("Purged %d index rows / %d chunks for DataSource %s",
purgeResult.get("indexRows", 0), purgeResult.get("chunks", 0), sourceId)
import json
from modules.shared.auditLogger import audit_logger
from modules.datamodels.datamodelAudit import AuditCategory
audit_logger.logEvent(
userId=str(context.user.id),
mandateId=context.mandateId,
category=AuditCategory.PERMISSION.value,
action="rag_index_toggled",
details=json.dumps({"sourceId": sourceId, "ragIndexEnabled": ragIndexEnabled, "resetDescendants": len(resetIds), "model": model.__name__}),
)
# 3. Compute effective + ancestors for response
updatedAncestors = _computeAncestorEffectives(rootIf, rec, model, "ragIndexEnabled")
effectiveRag = _computeOwnEffective(rootIf, rec, model, sourceId, "ragIndexEnabled")
return {
"sourceId": sourceId,
"ragIndexEnabled": ragIndexEnabled,
"effectiveRagIndexEnabled": effectiveRag,
"resetDescendantIds": resetIds,
"updatedAncestors": updatedAncestors,
}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource ragIndexEnabled: %s", e)
raise HTTPException(status_code=500, detail=str(e))
_CLICKUP_SOURCE_TYPES = {"clickup", "clickupList", "clickupSpace", "clickupFolder"}
_ALLOWED_RAG_LIMIT_KEYS = {
"files": {"maxItems", "maxBytes", "maxFileSize", "maxDepth"},
"clickup": {"maxTasks", "maxWorkspaces", "maxListsPerWorkspace"},
}
def _kindForSource(rec: Dict[str, Any], model) -> str:
"""Map a DataSource record to a RAG-limits kind ('files' or 'clickup').
FeatureDataSource (tables, not file walkers) reports as 'files' so the
same UI/limit shape works; the limits simply won't be consumed by any
walker today but are stored for forward-compat.
"""
if model is FeatureDataSource:
return "files"
sourceType = str(rec.get("sourceType") or "").strip()
return "clickup" if sourceType in _CLICKUP_SOURCE_TYPES else "files"
def _sanitizeRagLimits(kind: str, raw: Any) -> Dict[str, int]:
"""Coerce an incoming ragLimits dict to {allowedKey: positive int}.
Unknown keys are silently dropped; non-positive or non-numeric values
are rejected with 400.
"""
if not isinstance(raw, dict):
raise HTTPException(status_code=400, detail="ragLimits must be an object")
allowed = _ALLOWED_RAG_LIMIT_KEYS.get(kind, set())
cleaned: Dict[str, int] = {}
for key, value in raw.items():
if key not in allowed:
continue
try:
intValue = int(value)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be an integer")
if intValue <= 0:
raise HTTPException(status_code=400, detail=f"ragLimits.{key} must be > 0")
cleaned[key] = intValue
return cleaned
@router.patch("/{sourceId}/settings")
@limiter.limit("30/minute")
def _updateDataSourceSettings(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
settings: Dict[str, Any] = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Replace `settings` on a DataSource or FeatureDataSource (partial merge per top-level key).
Currently supports `ragLimits` only. Unknown top-level keys in the body are
rejected to avoid silently storing garbage that no consumer reads.
Owner-only for personal DataSources; mandate/feature scopes additionally
accept the mandate or workspace admins of that scope.
"""
if not isinstance(settings, dict):
raise HTTPException(status_code=400, detail="settings must be an object")
unknown = set(settings.keys()) - {"ragLimits"}
if unknown:
raise HTTPException(status_code=400, detail=f"Unknown settings keys: {sorted(unknown)}")
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
ownerId = str(rec.get("userId") or "")
currentUserId = str(context.user.id)
if ownerId and ownerId != currentUserId and not context.isSysAdmin:
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag
if model is DataSource:
connectionId = rec.get("connectionId", "")
allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId})
scope = str(getEffectiveFlag(rec, "scope", allDs, mode="walk"))
else:
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource as FDS
from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds
wsId = rec.get("workspaceInstanceId", "")
allFds = rootIf.db.getRecordset(FDS, recordFilter={"workspaceInstanceId": wsId})
scope = str(getEffectiveFlagFds(rec, "scope", allFds, mode="walk"))
isMandateAdmin = getattr(context, "isMandateAdmin", False)
if scope == "personal" or not isMandateAdmin:
raise HTTPException(status_code=403, detail="Not allowed to modify this DataSource's settings")
kind = _kindForSource(rec, model)
currentSettings = rec.get("settings") or {}
if not isinstance(currentSettings, dict):
currentSettings = {}
newSettings = dict(currentSettings)
if "ragLimits" in settings:
cleanedLimits = _sanitizeRagLimits(kind, settings["ragLimits"])
mergedLimits = dict(currentSettings.get("ragLimits") or {})
mergedLimits.update(cleanedLimits)
newSettings["ragLimits"] = mergedLimits
rootIf.db.recordModify(model, sourceId, {"settings": newSettings})
import json
from modules.shared.auditLogger import audit_logger
from modules.datamodels.datamodelAudit import AuditCategory
audit_logger.logEvent(
userId=currentUserId,
mandateId=context.mandateId,
category=AuditCategory.PERMISSION.value,
action="datasource_settings_changed",
details=json.dumps({
"sourceId": sourceId,
"model": model.__name__,
"oldSettings": currentSettings,
"newSettings": newSettings,
}),
)
logger.info("Updated settings on %s %s by user %s", model.__name__, sourceId, currentUserId)
return {"sourceId": sourceId, "settings": newSettings, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource settings: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{sourceId}/cost-estimate")
@limiter.limit("60/minute")
def _getDataSourceCostEstimate(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Return an indicative full-sync cost estimate for the given DataSource.
Uses the current effective ragLimits (DataSource.settings.ragLimits with
fallback to centralized defaults) as the basis. Returns the same
`{estimatedTokens, estimatedUsd, basis}` shape regardless of source kind.
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.serviceCenter.services.serviceKnowledge import _ragLimits, _costEstimate
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
kind = _kindForSource(rec, model)
effective = _ragLimits.getRagLimits(rec, kind)
estimate = _costEstimate.estimateBootstrapCost(effective, kind=kind)
estimate["sourceId"] = sourceId
return estimate
except HTTPException:
raise
except Exception as e:
logger.error("Error computing cost estimate: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))