import os import logging from typing import Dict, Any, List, Optional, Union import importlib from passlib.context import CryptContext from connector_db_json import JSONDatabaseConnector from model_gateway import User, UserInDB, Token, Mandate logger = logging.getLogger(__name__) # Password-Hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class GatewayInterface: """ Interface zum Gateway-System. Verwaltet Benutzer und Mandanten. """ def __init__(self, mandate_id: int = None, user_id: int = None): """ Initialisiert das Gateway-Interface mit optionalem Mandanten- und Benutzerkontext. Args: mandate_id: ID des aktuellen Mandanten (optional) user_id: ID des aktuellen Benutzers (optional) """ # Bei der Initialisierung kann der Kontext leer sein self.mandate_id = mandate_id if mandate_id is not None else 1 # Root-Mandant als Standard self.user_id = user_id if user_id is not None else 1 # Admin-Benutzer als Standard # Datenverzeichnis self.data_folder = "/data_gateway" os.makedirs(self.data_folder, exist_ok=True) # Datenmodell-Modul importieren try: self.model_module = importlib.import_module("model_gateway") logger.info("model_gateway erfolgreich importiert") except ImportError as e: logger.error(f"Fehler beim Importieren von model_gateway: {e}") raise # Konnektor erstellen self.db = JSONDatabaseConnector( db_folder=self.data_folder, mandate_id=self.mandate_id, user_id=self.user_id ) # Datenbank initialisieren, falls nötig self._initialize_database() def _initialize_database(self): """ Initialisiert die Datenbank mit minimalen Objekten, falls sie noch nicht existiert. """ # Prüfe, ob Mandanten existieren mandates = self.db.get_recordset("mandates") # Erstelle den Root-Mandanten, falls nötig if not mandates: logger.info("Erstelle Root-Mandant") root_mandate = { "id": 1, "name": "Root", "language": "de" } self.db.record_create("mandates", root_mandate) logger.info("Root-Mandant wurde erstellt") # Prüfe, ob Benutzer existieren users = self.db.get_recordset("users") # Erstelle den Admin-Benutzer, falls nötig if not users: logger.info("Erstelle Admin-Benutzer") admin_user = { "id": 1, "mandate_id": 1, # Root-Mandant "username": "admin", "email": "admin@example.com", "full_name": "Administrator", "disabled": False, "language": "de", "hashed_password": self._get_password_hash("admin") # In der Produktion ein sicheres Passwort verwenden! } self.db.record_create("users", admin_user) logger.info("Admin-Benutzer wurde erstellt") def _get_password_hash(self, password: str) -> str: """Erstellt einen Hash für ein Passwort""" return pwd_context.hash(password) def _verify_password(self, plain_password: str, hashed_password: str) -> bool: """Überprüft, ob das Passwort zum Hash passt""" return pwd_context.verify(plain_password, hashed_password) def _get_current_timestamp(self) -> str: """Gibt den aktuellen Zeitstempel im ISO-Format zurück""" from datetime import datetime return datetime.now().isoformat() # Mandanten-Methoden def get_all_mandates(self) -> List[Dict[str, Any]]: """Gibt alle Mandanten zurück""" return self.db.get_recordset("mandates") def get_mandate(self, mandate_id: int) -> Optional[Dict[str, Any]]: """Gibt einen Mandanten anhand seiner ID zurück""" mandates = self.db.get_recordset("mandates", record_filter={"id": mandate_id}) if mandates: return mandates[0] return None def create_mandate(self, name: str, language: str = "de") -> Dict[str, Any]: """Erstellt einen neuen Mandanten""" # Bestimme die nächste ID mandates = self.db.get_recordset("mandates") next_id = 1 if mandates: next_id = max(mandate["id"] for mandate in mandates) + 1 mandate_data = { "id": next_id, "name": name, "language": language } return self.db.record_create("mandates", mandate_data) # Benutzer-Methoden def get_all_users(self) -> List[Dict[str, Any]]: """Gibt alle Benutzer des aktuellen Mandanten zurück""" return self.db.get_recordset("users") def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: """Gibt einen Benutzer anhand seines Benutzernamens zurück""" users = self.db.get_recordset("users") for user in users: if user.get("username") == username: return user return None def get_user(self, user_id: int) -> Optional[Dict[str, Any]]: """Gibt einen Benutzer anhand seiner ID zurück""" users = self.db.get_recordset("users", record_filter={"id": user_id}) if users: return users[0] return None def create_user(self, username: str, password: str, email: str = None, full_name: str = None, language: str = "de") -> Dict[str, Any]: """Erstellt einen neuen Benutzer""" # Prüfe, ob der Benutzername bereits existiert existing_user = self.get_user_by_username(username) if existing_user: raise ValueError(f"Benutzer '{username}' existiert bereits") # Bestimme die nächste ID users = self.db.get_recordset("users") next_id = 1 if users: next_id = max(user["id"] for user in users) + 1 user_data = { "id": next_id, "mandate_id": self.mandate_id, "username": username, "email": email, "full_name": full_name, "disabled": False, "language": language, "hashed_password": self._get_password_hash(password) } created_user = self.db.record_create("users", user_data) # Entferne das Passwort-Hash aus der Rückgabe if "hashed_password" in created_user: del created_user["hashed_password"] return created_user def authenticate_user(self, username: str, password: str) -> Optional[Dict[str, Any]]: """Authentifiziert einen Benutzer anhand von Benutzername und Passwort""" user = self.get_user_by_username(username) if not user: return None if not self._verify_password(password, user.get("hashed_password", "")): return None # Erstelle eine Kopie ohne Passwort-Hash authenticated_user = {**user} if "hashed_password" in authenticated_user: del authenticated_user["hashed_password"] return authenticated_user def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Dict[str, Any]: """Aktualisiert einen Benutzer""" # Hole den aktuellen Benutzer user = self.get_user(user_id) if not user: raise ValueError(f"Benutzer mit ID {user_id} nicht gefunden") # Wenn das Passwort geändert werden soll, hashe es if "password" in user_data: user_data["hashed_password"] = self._get_password_hash(user_data["password"]) del user_data["password"] # Aktualisiere den Benutzer updated_user = self.db.record_modify("users", user_id, user_data) # Entferne das Passwort-Hash aus der Rückgabe if "hashed_password" in updated_user: del updated_user["hashed_password"] return updated_user def disable_user(self, user_id: int) -> Dict[str, Any]: """Deaktiviert einen Benutzer""" return self.update_user(user_id, {"disabled": True}) def enable_user(self, user_id: int) -> Dict[str, Any]: """Aktiviert einen Benutzer""" return self.update_user(user_id, {"disabled": False}) # Singleton-Factory für GatewayInterface-Instanzen pro Kontext _gateway_interfaces = {} def get_gateway_interface(mandate_id: int = None, user_id: int = None) -> GatewayInterface: """ Gibt eine GatewayInterface-Instanz für den angegebenen Kontext zurück. Wiederverwendet bestehende Instanzen. """ context_key = f"{mandate_id}_{user_id}" if context_key not in _gateway_interfaces: _gateway_interfaces[context_key] = GatewayInterface(mandate_id, user_id) return _gateway_interfaces[context_key]