gateway/modules/routes/routeDataSources.py

127 lines
5.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""PATCH endpoints for DataSource and FeatureDataSource scope/neutralize tagging."""
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelDataSource import DataSource
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeDataSources")
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/datasources",
tags=["Data Sources"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"},
},
)
_VALID_SCOPES = {"personal", "featureInstance", "mandate", "global"}
def _findSourceRecord(db, sourceId: str):
"""Look up a source by ID, checking DataSource first, then FeatureDataSource."""
rec = db.getRecord(DataSource, sourceId)
if rec:
return rec, DataSource
rec = db.getRecord(FeatureDataSource, sourceId)
if rec:
return rec, FeatureDataSource
return None, None
@router.patch("/{sourceId}/scope")
@limiter.limit("30/minute")
def _updateDataSourceScope(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
scope: str = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the scope of a DataSource or FeatureDataSource. Global scope requires sysAdmin."""
if scope not in _VALID_SCOPES:
raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}")
if scope == "global" and not context.isSysAdmin:
raise HTTPException(status_code=403, detail=routeApiMsg("Only sysadmins can set global scope"))
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"scope": scope})
logger.info("Updated scope=%s for %s %s", scope, model.__name__, sourceId)
return {"sourceId": sourceId, "scope": scope, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource scope: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/neutralize")
@limiter.limit("30/minute")
def _updateDataSourceNeutralize(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
neutralize: bool = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Toggle the neutralization flag on a DataSource or FeatureDataSource."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize})
logger.info("Updated neutralize=%s for %s %s", neutralize, model.__name__, sourceId)
return {"sourceId": sourceId, "neutralize": neutralize, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource neutralize: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/neutralize-fields")
@limiter.limit("30/minute")
def _updateNeutralizeFields(
request: Request,
sourceId: str = Path(..., description="ID of the FeatureDataSource"),
neutralizeFields: List[str] = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the list of field names to neutralize on a FeatureDataSource."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec = rootIf.db.getRecord(FeatureDataSource, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"FeatureDataSource {sourceId} not found")
cleanFields = [f for f in neutralizeFields if f and isinstance(f, str)] if neutralizeFields else []
rootIf.db.recordModify(FeatureDataSource, sourceId, {
"neutralizeFields": cleanFields if cleanFields else None,
})
logger.info("Updated neutralizeFields=%s for FeatureDataSource %s", cleanFields, sourceId)
return {"sourceId": sourceId, "neutralizeFields": cleanFields, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating neutralizeFields: %s", e)
raise HTTPException(status_code=500, detail=str(e))