gateway/gwserver/routes/users.py
2025-03-23 01:19:05 +01:00

214 lines
No EOL
8.1 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Body, Path
from typing import List, Dict, Any, Optional
from fastapi import status
# Import auth module
from auth import get_current_active_user, get_user_context
# Import interfaces
from modules.gateway_interface import get_gateway_interface
# Router für Benutzer-Endpunkte erstellen
router = APIRouter(
prefix="/api/users",
tags=["Users"],
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Dict[str, Any]])
async def get_users(current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""Alle verfügbaren Benutzer abrufen (nur für Admin/SysAdmin-Benutzer)"""
mandate_id, user_id = await get_user_context(current_user)
# Gateway-Interface mit Benutzerkontext initialisieren
gateway = get_gateway_interface(mandate_id, user_id)
# Berechtigungsprüfung
if current_user.get("privilege") not in ["admin", "sysadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Abrufen der Benutzerliste"
)
# Admin sieht nur Benutzer des eigenen Mandanten, SysAdmin sieht alle
if current_user.get("privilege") == "admin":
return gateway.get_users_by_mandate(mandate_id)
else: # sysadmin
return gateway.get_all_users()
@router.post("/register", response_model=Dict[str, Any])
async def register_user(user_data: dict = Body(...)):
"""Neuen Benutzer registrieren"""
# Bei der Registrierung keinen Benutzerkontext verwenden
gateway = get_gateway_interface()
if "username" not in user_data or "password" not in user_data:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
try:
new_mandate = gateway.create_mandate(
name=f"Mandant von {user_data['username']}",
language=user_data.get("language", "de")
)
new_user = gateway.create_user(
username=user_data["username"],
password=user_data["password"],
email=user_data.get("email"),
full_name=user_data.get("full_name"),
language=user_data.get("language", "de"),
mandate_id=new_mandate["id"],
disabled=False,
privilege="user"
)
return new_user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{user_id}", response_model=Dict[str, Any])
async def get_user(
user_id: int,
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen bestimmten Benutzer abrufen"""
mandate_id, current_user_id = await get_user_context(current_user)
# Gateway-Interface mit Benutzerkontext initialisieren
gateway = get_gateway_interface(mandate_id, current_user_id)
# Berechtigungsprüfung
# Benutzer darf nur sich selbst abrufen, Admin nur Benutzer des eigenen Mandanten, SysAdmin alle
user_to_get = gateway.get_user(user_id)
if not user_to_get:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
if user_id == current_user_id:
# Benutzer darf sich selbst abrufen
pass
elif current_user.get("privilege") == "admin" and user_to_get.get("mandate_id") == mandate_id:
# Admin darf Benutzer des eigenen Mandanten abrufen
pass
elif current_user.get("privilege") == "sysadmin":
# SysAdmin darf alle Benutzer abrufen
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Abrufen dieses Benutzers"
)
return user_to_get
@router.put("/{user_id}", response_model=Dict[str, Any])
async def update_user(
user_id: int = Path(..., description="ID des zu aktualisierenden Benutzers"),
user_data: Dict[str, Any] = Body(..., description="Aktualisierte Benutzerdaten"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen bestehenden Benutzer aktualisieren"""
mandate_id, current_user_id = await get_user_context(current_user)
# Gateway-Interface mit Benutzerkontext initialisieren
gateway = get_gateway_interface(mandate_id, current_user_id)
# Benutzer existiert?
user_to_update = gateway.get_user(user_id)
if not user_to_update:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
# Berechtigungsprüfung
is_self_update = user_id == current_user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_update.get("mandate_id") == mandate_id
# Filtere erlaubte Felder je nach Berechtigungsstufe
allowed_fields = {"username", "email", "full_name", "language"}
sensitive_fields = {"mandate_id", "disabled", "privilege"}
# Prüfe, ob sensitive Felder geändert werden sollen
sensitive_update = any(field in user_data for field in sensitive_fields)
if is_self_update and sensitive_update:
# Normale Benutzer dürfen ihre sensitiven Daten nicht ändern
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Ändern sensitiver Benutzerdaten"
)
elif is_admin and sensitive_update and not same_mandate:
# Admins dürfen sensitive Daten nur für Benutzer des eigenen Mandanten ändern
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Ändern sensitiver Daten für Benutzer anderer Mandanten"
)
elif not (is_self_update or (is_admin and same_mandate) or is_sysadmin):
# Keine Berechtigung für andere Fälle
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Aktualisieren dieses Benutzers"
)
# Entferne nicht erlaubte Felder für normale Benutzer
if not (is_admin or is_sysadmin):
user_data = {k: v for k, v in user_data.items() if k in allowed_fields}
# User-Daten aktualisieren
updated_user = gateway.update_user(user_id, user_data)
return updated_user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int = Path(..., description="ID des zu löschenden Benutzers"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen Benutzer löschen"""
mandate_id, current_user_id = await get_user_context(current_user)
# Gateway-Interface mit Benutzerkontext initialisieren
gateway = get_gateway_interface(mandate_id, current_user_id)
# Benutzer existiert?
user_to_delete = gateway.get_user(user_id)
if not user_to_delete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
# Berechtigungsprüfung
is_self_delete = user_id == current_user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_delete.get("mandate_id") == mandate_id
if is_self_delete:
# Benutzer darf sich selbst löschen
pass
elif is_admin and same_mandate:
# Admin darf Benutzer des eigenen Mandanten löschen
pass
elif is_sysadmin:
# SysAdmin darf alle Benutzer löschen
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Löschen dieses Benutzers"
)
# Benutzer und alle referenzierten Objekte löschen
success = gateway.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Fehler beim Löschen des Benutzers mit ID {user_id}"
)
# Kein Inhalt zurückgeben bei erfolgreichem Löschen
return None