gateway/modules/routes/routeDataSources.py
2026-03-28 16:59:01 +01:00

97 lines
3.8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""PATCH endpoints for DataSource and FeatureDataSource scope/neutralize tagging."""
import logging
from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Depends, Path, Request, Body
from modules.auth import limiter, getRequestContext, RequestContext
from modules.auth.authentication import _hasSysAdminRole
from modules.datamodels.datamodelDataSource import DataSource
from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/datasources",
tags=["Data Sources"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"},
},
)
_VALID_SCOPES = {"personal", "featureInstance", "mandate", "global"}
def _findSourceRecord(db, sourceId: str):
"""Look up a source by ID, checking DataSource first, then FeatureDataSource."""
rec = db.getRecord(DataSource, sourceId)
if rec:
return rec, DataSource
rec = db.getRecord(FeatureDataSource, sourceId)
if rec:
return rec, FeatureDataSource
return None, None
@router.patch("/{sourceId}/scope")
@limiter.limit("30/minute")
def _updateDataSourceScope(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
scope: str = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the scope of a DataSource or FeatureDataSource. Global scope requires sysAdmin."""
if scope not in _VALID_SCOPES:
raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {_VALID_SCOPES}")
if scope == "global" and not _hasSysAdminRole(context.user):
raise HTTPException(status_code=403, detail="Only sysadmins can set global scope")
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"scope": scope})
logger.info("Updated scope=%s for %s %s", scope, model.__name__, sourceId)
return {"sourceId": sourceId, "scope": scope, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource scope: %s", e)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{sourceId}/neutralize")
@limiter.limit("30/minute")
def _updateDataSourceNeutralize(
request: Request,
sourceId: str = Path(..., description="ID of the DataSource or FeatureDataSource"),
neutralize: bool = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Toggle the neutralization flag on a DataSource or FeatureDataSource."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
rec, model = _findSourceRecord(rootIf.db, sourceId)
if not rec:
raise HTTPException(status_code=404, detail=f"DataSource {sourceId} not found")
rootIf.db.recordModify(model, sourceId, {"neutralize": neutralize})
logger.info("Updated neutralize=%s for %s %s", neutralize, model.__name__, sourceId)
return {"sourceId": sourceId, "neutralize": neutralize, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error("Error updating datasource neutralize: %s", e)
raise HTTPException(status_code=500, detail=str(e))