216 lines
No EOL
7.4 KiB
Python
216 lines
No EOL
7.4 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, Body, Path
|
|
from typing import List, Dict, Any
|
|
from fastapi import status
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
|
|
# Import interfaces
|
|
from modules.auth import get_current_active_user, get_user_context
|
|
from modules.gateway_interface import get_gateway_interface
|
|
from modules.gateway_model import Mandate
|
|
|
|
# Alle Attribute des Models ermitteln (außer interne/spezielle Attribute)
|
|
def get_model_attributes(model_class):
|
|
return [attr for attr in dir(model_class)
|
|
if not callable(getattr(model_class, attr))
|
|
and not attr.startswith('_')
|
|
and attr != 'metadata'
|
|
and attr != 'query'
|
|
and attr != 'query_class'
|
|
and attr != 'label'
|
|
and attr != 'field_labels']
|
|
|
|
# Modell-Attribute für Mandate
|
|
mandate_attributes = get_model_attributes(Mandate)
|
|
|
|
@dataclass
|
|
class AppContext:
|
|
"""Kontext-Objekt für alle benötigten Verbindungen und Benutzerinformationen"""
|
|
mandate_id: int
|
|
user_id: int
|
|
interface_data: Any # Gateway Interface
|
|
|
|
async def get_context(current_user: Dict[str, Any]) -> AppContext:
|
|
"""
|
|
Erstellt ein zentrales Kontext-Objekt mit allen benötigten Interfaces
|
|
|
|
Args:
|
|
current_user: Aktueller Benutzer aus der Authentifizierung
|
|
|
|
Returns:
|
|
AppContext-Objekt mit allen benötigten Verbindungen
|
|
"""
|
|
mandate_id, user_id = await get_user_context(current_user)
|
|
interface_data = get_gateway_interface(mandate_id, user_id)
|
|
|
|
return AppContext(
|
|
mandate_id=mandate_id,
|
|
user_id=user_id,
|
|
interface_data=interface_data
|
|
)
|
|
|
|
# Router für Mandanten-Endpunkte erstellen
|
|
router = APIRouter(
|
|
prefix="/api/mandates",
|
|
tags=["Mandates"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("", response_model=List[Dict[str, Any]])
|
|
async def get_mandates(current_user: Dict[str, Any] = Depends(get_current_active_user)):
|
|
"""Alle verfügbaren Mandanten abrufen (nur für SysAdmin-Benutzer)"""
|
|
context = await get_context(current_user)
|
|
|
|
# Berechtigungsprüfung
|
|
if current_user.get("privilege") != "sysadmin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Nur System-Administratoren können alle Mandanten abrufen"
|
|
)
|
|
|
|
# Mandanten generisch abrufen
|
|
return context.interface_data.get_all_mandates()
|
|
|
|
|
|
@router.post("", response_model=Dict[str, Any])
|
|
async def create_mandate(
|
|
mandate: Dict[str, Any] = Body(...),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen neuen Mandanten erstellen (nur für SysAdmin-Benutzer)"""
|
|
context = await get_context(current_user)
|
|
|
|
# Berechtigungsprüfung
|
|
if current_user.get("privilege") != "sysadmin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Nur System-Administratoren können Mandanten erstellen"
|
|
)
|
|
|
|
# Attribute aus dem Request dynamisch setzen
|
|
mandate_data = {}
|
|
for attr in mandate_attributes:
|
|
if attr in mandate:
|
|
mandate_data[attr] = mandate[attr]
|
|
|
|
# Standardwerte für fehlende Felder
|
|
if "name" not in mandate_data:
|
|
mandate_data["name"] = "Neuer Mandant"
|
|
if "language" not in mandate_data:
|
|
mandate_data["language"] = "de"
|
|
|
|
# Mandant erstellen
|
|
new_mandate = context.interface_data.create_mandate(**mandate_data)
|
|
|
|
return new_mandate
|
|
|
|
|
|
@router.get("/{mandate_id}", response_model=Dict[str, Any])
|
|
async def get_mandate(
|
|
mandate_id: int,
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen bestimmten Mandanten abrufen"""
|
|
context = await get_context(current_user)
|
|
|
|
# Berechtigungsprüfung
|
|
# Admin darf nur seinen eigenen Mandanten sehen, SysAdmin alle
|
|
is_admin = current_user.get("privilege") == "admin"
|
|
is_sysadmin = current_user.get("privilege") == "sysadmin"
|
|
is_own_mandate = context.mandate_id == mandate_id
|
|
|
|
if (is_admin and not is_own_mandate) and not is_sysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Keine Berechtigung zum Abrufen dieses Mandanten"
|
|
)
|
|
|
|
# Mandant generisch abrufen
|
|
mandate = context.interface_data.get_mandate(mandate_id)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandant mit ID {mandate_id} nicht gefunden"
|
|
)
|
|
|
|
return mandate
|
|
|
|
@router.put("/{mandate_id}", response_model=Dict[str, Any])
|
|
async def update_mandate(
|
|
mandate_id: int = Path(..., description="ID des zu aktualisierenden Mandanten"),
|
|
mandate_data: Dict[str, Any] = Body(..., description="Aktualisierte Mandantendaten"),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen bestehenden Mandanten aktualisieren"""
|
|
context = await get_context(current_user)
|
|
|
|
# Mandant existiert?
|
|
mandate = context.interface_data.get_mandate(mandate_id)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandant mit ID {mandate_id} nicht gefunden"
|
|
)
|
|
|
|
# Berechtigungsprüfung
|
|
is_admin = current_user.get("privilege") == "admin"
|
|
is_sysadmin = current_user.get("privilege") == "sysadmin"
|
|
is_own_mandate = context.mandate_id == mandate_id
|
|
|
|
if (is_admin and not is_own_mandate) and not is_sysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Keine Berechtigung zum Aktualisieren dieses Mandanten"
|
|
)
|
|
|
|
# Attribute aus dem Request dynamisch in update_data filtern
|
|
update_data = {}
|
|
for attr in mandate_attributes:
|
|
if attr in mandate_data:
|
|
update_data[attr] = mandate_data[attr]
|
|
|
|
# Mandant aktualisieren
|
|
updated_mandate = context.interface_data.update_mandate(
|
|
mandate_id=mandate_id,
|
|
mandate_data=update_data
|
|
)
|
|
|
|
return updated_mandate
|
|
|
|
@router.delete("/{mandate_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_mandate(
|
|
mandate_id: int = Path(..., description="ID des zu löschenden Mandanten"),
|
|
current_user: Dict[str, Any] = Depends(get_current_active_user)
|
|
):
|
|
"""Einen Mandanten löschen, inklusive aller zugehörigen Benutzer und referenzierten Objekte"""
|
|
context = await get_context(current_user)
|
|
|
|
# Mandant existiert?
|
|
mandate = context.interface_data.get_mandate(mandate_id)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandant mit ID {mandate_id} nicht gefunden"
|
|
)
|
|
|
|
# Berechtigungsprüfung
|
|
is_admin = current_user.get("privilege") == "admin"
|
|
is_sysadmin = current_user.get("privilege") == "sysadmin"
|
|
is_own_mandate = context.mandate_id == mandate_id
|
|
|
|
if (is_admin and not is_own_mandate) and not is_sysadmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Keine Berechtigung zum Löschen dieses Mandanten"
|
|
)
|
|
|
|
# Mandant löschen
|
|
success = context.interface_data.delete_mandate(mandate_id)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Fehler beim Löschen des Mandanten mit ID {mandate_id}"
|
|
)
|
|
|
|
# Kein Inhalt zurückgeben bei erfolgreichem Löschen
|
|
return None |