199 lines
No EOL
6.6 KiB
Python
199 lines
No EOL
6.6 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, Body, Path
|
|
from typing import List, Dict, Any
|
|
from fastapi import status
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
|
|
# Import interfaces
|
|
from modules.auth import getCurrentActiveUser, getUserContext
|
|
from modules.gatewayInterface import getGatewayInterface
|
|
from modules.gatewayModel import Mandate
|
|
|
|
# Determine all attributes of the model
|
|
def getModelAttributes(modelClass):
|
|
return [attr for attr in dir(modelClass)
|
|
if not callable(getattr(modelClass, attr))
|
|
and not attr.startswith('_')
|
|
and attr not in ('metadata', 'query', 'query_class', 'label', 'field_labels')]
|
|
|
|
# Model attributes for Mandate
|
|
mandateAttributes = getModelAttributes(Mandate)
|
|
|
|
@dataclass
|
|
class AppContext:
|
|
"""Context object for all required connections and user information"""
|
|
mandateId: int
|
|
userId: int
|
|
interfaceData: Any # Gateway Interface
|
|
|
|
async def getContext(currentUser: Dict[str, Any]) -> AppContext:
|
|
"""Creates a central context object with all required connections"""
|
|
mandateId, userId = await getUserContext(currentUser)
|
|
interfaceData = getGatewayInterface(mandateId, userId)
|
|
|
|
return AppContext(
|
|
mandateId=mandateId,
|
|
userId=userId,
|
|
interfaceData=interfaceData
|
|
)
|
|
|
|
# Create router for mandate endpoints
|
|
router = APIRouter(
|
|
prefix="/api/mandates",
|
|
tags=["Mandates"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("", response_model=List[Dict[str, Any]])
|
|
async def getMandates(currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)):
|
|
"""Get all available mandates (only for SysAdmin users)"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Permission check
|
|
if currentUser.get("privilege") != "sysadmin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only system administrators can access all mandates"
|
|
)
|
|
|
|
# Get mandates
|
|
return context.interfaceData.getAllMandates()
|
|
|
|
@router.post("", response_model=Dict[str, Any])
|
|
async def createMandate(
|
|
mandate: Dict[str, Any] = Body(...),
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Create a new mandate (only for SysAdmin users)"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Permission check
|
|
if currentUser.get("privilege") != "sysadmin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only system administrators can create mandates"
|
|
)
|
|
|
|
# Set attributes from the request dynamically
|
|
mandateData = {}
|
|
for attr in mandateAttributes:
|
|
if attr in mandate:
|
|
mandateData[attr] = mandate[attr]
|
|
|
|
# Default values for missing fields
|
|
mandateData.setdefault("name", "New Mandate")
|
|
mandateData.setdefault("language", "de")
|
|
|
|
# Create mandate
|
|
newMandate = context.interfaceData.createMandate(**mandateData)
|
|
|
|
return newMandate
|
|
|
|
@router.get("/{mandateId}", response_model=Dict[str, Any])
|
|
async def getMandate(
|
|
mandateId: int,
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Get a specific mandate"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Permission check
|
|
# Admin can only see their own mandate, SysAdmin can see all
|
|
isAdmin = currentUser.get("privilege") == "admin"
|
|
isSysadmin = currentUser.get("privilege") == "sysadmin"
|
|
isOwnMandate = context.mandateId == mandateId
|
|
|
|
if (isAdmin and not isOwnMandate) and not isSysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to access this mandate"
|
|
)
|
|
|
|
# Get mandate
|
|
mandate = context.interfaceData.getMandate(mandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
return mandate
|
|
|
|
@router.put("/{mandateId}", response_model=Dict[str, Any])
|
|
async def updateMandate(
|
|
mandateId: int = Path(..., description="ID of the mandate to update"),
|
|
mandateData: Dict[str, Any] = Body(..., description="Updated mandate data"),
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Update an existing mandate"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Mandate exists?
|
|
mandate = context.interfaceData.getMandate(mandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
# Permission check
|
|
isAdmin = currentUser.get("privilege") == "admin"
|
|
isSysadmin = currentUser.get("privilege") == "sysadmin"
|
|
isOwnMandate = context.mandateId == mandateId
|
|
|
|
if (isAdmin and not isOwnMandate) and not isSysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to update this mandate"
|
|
)
|
|
|
|
# Dynamically filter attributes from the request into updateData
|
|
updateData = {}
|
|
for attr in mandateAttributes:
|
|
if attr in mandateData:
|
|
updateData[attr] = mandateData[attr]
|
|
|
|
# Update mandate
|
|
updatedMandate = context.interfaceData.updateMandate(
|
|
mandateId=mandateId,
|
|
mandateData=updateData
|
|
)
|
|
|
|
return updatedMandate
|
|
|
|
@router.delete("/{mandateId}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def deleteMandate(
|
|
mandateId: int = Path(..., description="ID of the mandate to delete"),
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Delete a mandate, including all associated users and referenced objects"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Mandate exists?
|
|
mandate = context.interfaceData.getMandate(mandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
# Permission check
|
|
isAdmin = currentUser.get("privilege") == "admin"
|
|
isSysadmin = currentUser.get("privilege") == "sysadmin"
|
|
isOwnMandate = context.mandateId == mandateId
|
|
|
|
if (isAdmin and not isOwnMandate) and not isSysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to delete this mandate"
|
|
)
|
|
|
|
# Delete mandate
|
|
success = context.interfaceData.deleteMandate(mandateId)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error deleting mandate with ID {mandateId}"
|
|
)
|
|
|
|
return None |