gateway/routes/users.py
2025-04-19 01:02:46 +02:00

265 lines
No EOL
9.9 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Body, Path
from typing import List, Dict, Any, Optional
from fastapi import status
from datetime import datetime
from dataclasses import dataclass
# Import auth module
from modules.auth import get_current_active_user, get_user_context
# Import interfaces
from modules.gateway_interface import get_gateway_interface
from modules.gateway_model import User
# Alle Attribute des Models ermitteln (außer interne/spezielle Attribute)
def get_model_attributes(model_class):
return [attr for attr in dir(model_class)
if not callable(getattr(model_class, attr))
and not attr.startswith('_')
and attr != 'metadata'
and attr != 'query'
and attr != 'query_class'
and attr != 'label'
and attr != 'field_labels']
# Modell-Attribute für User
user_attributes = get_model_attributes(User)
@dataclass
class AppContext:
"""Kontext-Objekt für alle benötigten Verbindungen und Benutzerinformationen"""
mandate_id: int
user_id: int
interface_data: Any # Gateway Interface
async def get_context(current_user: Dict[str, Any]) -> AppContext:
"""
Erstellt ein zentrales Kontext-Objekt mit allen benötigten Interfaces
Args:
current_user: Aktueller Benutzer aus der Authentifizierung
Returns:
AppContext-Objekt mit allen benötigten Verbindungen
"""
mandate_id, user_id = await get_user_context(current_user)
interface_data = get_gateway_interface(mandate_id, user_id)
return AppContext(
mandate_id=mandate_id,
user_id=user_id,
interface_data=interface_data
)
# Router für Benutzer-Endpunkte erstellen
router = APIRouter(
prefix="/api/users",
tags=["Users"],
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Dict[str, Any]])
async def get_users(current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""Alle verfügbaren Benutzer abrufen (nur für Admin/SysAdmin-Benutzer)"""
context = await get_context(current_user)
# Berechtigungsprüfung
if current_user.get("privilege") not in ["admin", "sysadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Abrufen der Benutzerliste"
)
# Admin sieht nur Benutzer des eigenen Mandanten, SysAdmin sieht alle
if current_user.get("privilege") == "admin":
return context.interface_data.get_users_by_mandate(context.mandate_id)
else: # sysadmin
return context.interface_data.get_all_users()
@router.post("/register", response_model=Dict[str, Any])
async def register_user(user_data: dict = Body(...)):
"""Neuen Benutzer registrieren"""
# Bei der Registrierung keinen Benutzerkontext verwenden
gateway = get_gateway_interface()
if "username" not in user_data or "password" not in user_data:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
try:
# Attribute dynamisch filtern
mandate_data = {
"name": f"Mandant von {user_data['username']}",
"language": user_data.get("language", "de")
}
new_mandate = gateway.create_mandate(**mandate_data)
# User-Attribute aus dem Request filtern
user_create_data = {}
for attr in user_attributes:
if attr in user_data and attr not in ["id"]: # ID wird automatisch vergeben
user_create_data[attr] = user_data[attr]
# Pflichtfelder
user_create_data["username"] = user_data["username"]
user_create_data["password"] = user_data["password"]
user_create_data["mandate_id"] = new_mandate["id"]
# Standardwerte für optionale Felder
if "disabled" not in user_create_data:
user_create_data["disabled"] = False
if "privilege" not in user_create_data:
user_create_data["privilege"] = "user"
if "language" not in user_create_data:
user_create_data["language"] = "de"
new_user = gateway.create_user(**user_create_data)
return new_user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{user_id}", response_model=Dict[str, Any])
async def get_user(
user_id: int,
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen bestimmten Benutzer abrufen"""
context = await get_context(current_user)
# Gateway-Interface mit Benutzerkontext initialisieren
user_to_get = context.interface_data.get_user(user_id)
if not user_to_get:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
# Berechtigungsprüfung
# Benutzer darf nur sich selbst abrufen, Admin nur Benutzer des eigenen Mandanten, SysAdmin alle
if user_id == context.user_id:
# Benutzer darf sich selbst abrufen
pass
elif current_user.get("privilege") == "admin" and user_to_get.get("mandate_id") == context.mandate_id:
# Admin darf Benutzer des eigenen Mandanten abrufen
pass
elif current_user.get("privilege") == "sysadmin":
# SysAdmin darf alle Benutzer abrufen
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Abrufen dieses Benutzers"
)
return user_to_get
@router.put("/{user_id}", response_model=Dict[str, Any])
async def update_user(
user_id: int = Path(..., description="ID des zu aktualisierenden Benutzers"),
user_data: Dict[str, Any] = Body(..., description="Aktualisierte Benutzerdaten"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen bestehenden Benutzer aktualisieren"""
context = await get_context(current_user)
# Benutzer existiert?
user_to_update = context.interface_data.get_user(user_id)
if not user_to_update:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
# Berechtigungsprüfung
is_self_update = user_id == context.user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_update.get("mandate_id") == context.mandate_id
# Filtere erlaubte Felder je nach Berechtigungsstufe
allowed_fields = {"username", "email", "full_name", "language"}
sensitive_fields = {"mandate_id", "disabled", "privilege"}
# Prüfe, ob sensitive Felder geändert werden sollen
sensitive_update = any(field in user_data for field in sensitive_fields)
if is_self_update and sensitive_update:
# Normale Benutzer dürfen ihre sensitiven Daten nicht ändern
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Ändern sensitiver Benutzerdaten"
)
elif is_admin and sensitive_update and not same_mandate:
# Admins dürfen sensitive Daten nur für Benutzer des eigenen Mandanten ändern
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Ändern sensitiver Daten für Benutzer anderer Mandanten"
)
elif not (is_self_update or (is_admin and same_mandate) or is_sysadmin):
# Keine Berechtigung für andere Fälle
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Aktualisieren dieses Benutzers"
)
# Attribute aus dem Request dynamisch filtern
update_data = {}
for attr in user_attributes:
if attr in user_data and attr not in ["id"]: # ID kann nicht geändert werden
update_data[attr] = user_data[attr]
# Entferne nicht erlaubte Felder für normale Benutzer
if not (is_admin or is_sysadmin):
update_data = {k: v for k, v in update_data.items() if k in allowed_fields}
# User-Daten aktualisieren
updated_user = context.interface_data.update_user(user_id, update_data)
return updated_user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int = Path(..., description="ID des zu löschenden Benutzers"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Einen Benutzer löschen"""
context = await get_context(current_user)
# Benutzer existiert?
user_to_delete = context.interface_data.get_user(user_id)
if not user_to_delete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Benutzer mit ID {user_id} nicht gefunden"
)
# Berechtigungsprüfung
is_self_delete = user_id == context.user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_delete.get("mandate_id") == context.mandate_id
if is_self_delete:
# Benutzer darf sich selbst löschen
pass
elif is_admin and same_mandate:
# Admin darf Benutzer des eigenen Mandanten löschen
pass
elif is_sysadmin:
# SysAdmin darf alle Benutzer löschen
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Keine Berechtigung zum Löschen dieses Benutzers"
)
# Benutzer und alle referenzierten Objekte löschen
success = context.interface_data.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Fehler beim Löschen des Benutzers mit ID {user_id}"
)
# Kein Inhalt zurückgeben bei erfolgreichem Löschen
return None