diff --git a/gwserver/connector_db_json.py b/gwserver/connector_db_json.py index 53dd6baa..f80076a4 100644 --- a/gwserver/connector_db_json.py +++ b/gwserver/connector_db_json.py @@ -107,17 +107,14 @@ class JSONDatabaseConnector: # Wenn beides existiert, filtere entsprechend if has_mandate and has_user: - print(" DEBUG opt m+u: record +mandate:",record["mandate_id"]) if record["mandate_id"] == self.mandate_id: filtered_records.append(record) # Wenn nur mandate_id existiert elif has_mandate and not has_user: - print(" DEBUG opt m : record +mandate:",record["mandate_id"]) if record["mandate_id"] == self.mandate_id: filtered_records.append(record) # Wenn weder mandate_id noch user_id existieren, füge den Datensatz hinzu elif not has_mandate and not has_user: - print(" DEBUG opt ---: record +") filtered_records.append(record) return filtered_records @@ -135,24 +132,20 @@ class JSONDatabaseConnector: for field, value in record_filter.items(): # Prüfen, ob das Feld existiert if field not in record: - print(" - field not in record") match = False break # Wenn der Filterwert ein Integer-String ist und das Datensatzfeld ein Integer if isinstance(value, str) and value.isdigit() and isinstance(record[field], int): if record[field] != int(value): - print(" - ",record[field],"!=",int(value)) match = False break # Sonst direkter Vergleich elif record[field] != value: - print(" - ",record[field],"!='",value,"'") match = False break if match: - print(" + take it") filtered_records.append(record) return filtered_records diff --git a/gwserver/connector_db_mysql.py b/gwserver/connector_db_mysql.py new file mode 100644 index 00000000..c942bfc0 --- /dev/null +++ b/gwserver/connector_db_mysql.py @@ -0,0 +1,454 @@ +import os +import logging +from typing import List, Dict, Any, Optional, Union +from datetime import datetime +import mysql.connector +from mysql.connector import Error + + +logger = logging.getLogger(__name__) + + +class MySQLDatabaseConnector: + """ + Ein Konnektor für MySQL-basierte Datenspeicherung. + Stellt generische Datenbankoperationen bereit. + """ + + def __init__(self, db_host: str, db_name: str, db_user: str, db_password: str, mandate_id: int = None, user_id: int = None): + """ + Initialisiert den MySQL-Datenbankkonnektor. + + Args: + db_host: MySQL-Server Host + db_name: Name der Datenbank + db_user: Benutzername für die Authentifizierung + db_password: Passwort für die Authentifizierung + mandate_id: Kontext-Parameter für den Mandanten + user_id: Kontext-Parameter für den Benutzer + """ + # Speichere die Eingabeparameter + self.db_host = db_host + self.db_name = db_name + self.db_user = db_user + self.db_password = db_password + + # Prüfe, ob Kontext-Parameter gesetzt sind + if mandate_id is None or user_id is None: + raise ValueError("mandate_id und user_id müssen gesetzt sein") + + self.mandate_id = mandate_id + self.user_id = user_id + + # Stelle Verbindung zur Datenbank her + self.connection = self._create_connection() + + logger.info(f"MySQLDatabaseConnector initialisiert für Datenbank: {db_name}") + logger.info(f"Kontext: mandate_id={mandate_id}, user_id={user_id}") + + def _create_connection(self): + """Erstellt eine Verbindung zur MySQL-Datenbank""" + try: + connection = mysql.connector.connect( + host=self.db_host, + database=self.db_name, + user=self.db_user, + password=self.db_password + ) + if connection.is_connected(): + logger.info(f"Verbunden mit MySQL-Server Version {connection.get_server_info()}") + return connection + except Error as e: + logger.error(f"Fehler bei der Verbindung zu MySQL: {e}") + raise + + def _execute_query(self, query: str, params: tuple = None): + """Führt eine SQL-Abfrage aus""" + cursor = None + try: + cursor = self.connection.cursor(dictionary=True) + cursor.execute(query, params) + return cursor + except Error as e: + logger.error(f"Fehler bei der Ausführung der Abfrage: {e}") + raise + finally: + if cursor: + cursor.close() + + def _execute_select(self, query: str, params: tuple = None) -> List[Dict[str, Any]]: + """Führt eine SELECT-Abfrage aus und gibt die Ergebnisse zurück""" + cursor = None + try: + cursor = self.connection.cursor(dictionary=True) + cursor.execute(query, params) + result = cursor.fetchall() + return result + except Error as e: + logger.error(f"Fehler bei der Ausführung der SELECT-Abfrage: {e}") + raise + finally: + if cursor: + cursor.close() + + def _execute_insert(self, query: str, params: tuple = None) -> int: + """Führt eine INSERT-Abfrage aus und gibt die ID des eingefügten Datensatzes zurück""" + cursor = None + try: + cursor = self.connection.cursor() + cursor.execute(query, params) + self.connection.commit() + return cursor.lastrowid + except Error as e: + logger.error(f"Fehler bei der Ausführung der INSERT-Abfrage: {e}") + self.connection.rollback() + raise + finally: + if cursor: + cursor.close() + + def _execute_update(self, query: str, params: tuple = None) -> int: + """Führt eine UPDATE-Abfrage aus und gibt die Anzahl der betroffenen Zeilen zurück""" + cursor = None + try: + cursor = self.connection.cursor() + cursor.execute(query, params) + self.connection.commit() + return cursor.rowcount + except Error as e: + logger.error(f"Fehler bei der Ausführung der UPDATE-Abfrage: {e}") + self.connection.rollback() + raise + finally: + if cursor: + cursor.close() + + def _execute_delete(self, query: str, params: tuple = None) -> int: + """Führt eine DELETE-Abfrage aus und gibt die Anzahl der gelöschten Zeilen zurück""" + cursor = None + try: + cursor = self.connection.cursor() + cursor.execute(query, params) + self.connection.commit() + return cursor.rowcount + except Error as e: + logger.error(f"Fehler bei der Ausführung der DELETE-Abfrage: {e}") + self.connection.rollback() + raise + finally: + if cursor: + cursor.close() + + def _apply_record_filter(self, record_filter: Dict[str, Any] = None) -> str: + """Erstellt eine WHERE-Klausel basierend auf dem Datensatzfilter""" + if not record_filter: + return "WHERE 1=1" + + conditions = [] + params = [] + + for field, value in record_filter.items(): + conditions.append(f"{field} = %s") + params.append(value) + + where_clause = "WHERE " + " AND ".join(conditions) + + return where_clause, tuple(params) + + def _get_context_filter(self) -> tuple: + """Erstellt eine WHERE-Klausel für den Mandanten- und Benutzerkontext""" + return "WHERE mandate_id = %s", (self.mandate_id,) + + # Public API + + def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]: + """ + Gibt eine Liste aller verfügbaren Tabellen zurück. + + Args: + filter_criteria: Optionale Filterkriterien (nicht implementiert) + + Returns: + Liste der Tabellennamen + """ + query = """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = %s + """ + + try: + result = self._execute_select(query, (self.db_name,)) + return [row["table_name"] for row in result] + except Exception as e: + logger.error(f"Fehler beim Abrufen der Tabellen: {e}") + return [] + + def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]: + """ + Gibt eine Liste aller Felder einer Tabelle zurück. + + Args: + table: Name der Tabelle + filter_criteria: Optionale Filterkriterien (nicht implementiert) + + Returns: + Liste der Feldnamen + """ + query = """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + """ + + try: + result = self._execute_select(query, (self.db_name, table)) + return [row["column_name"] for row in result] + except Exception as e: + logger.error(f"Fehler beim Abrufen der Felder für Tabelle {table}: {e}") + return [] + + def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]: + """ + Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels. + + Args: + table: Name der Tabelle + language: Sprache für die Labels (optional) + filter_criteria: Optionale Filterkriterien (nicht implementiert) + + Returns: + Schema-Objekt mit Feldern, Datentypen und Labels + """ + query = """ + SELECT + column_name, + data_type, + column_comment + FROM + information_schema.columns + WHERE + table_schema = %s AND table_name = %s + """ + + schema = {} + + try: + result = self._execute_select(query, (self.db_name, table)) + + for row in result: + field = row["column_name"] + data_type = row["data_type"] + comment = row["column_comment"] + + # Label erstellen (Standardwert ist der Feldname) + label = field + + # Wenn ein Kommentar existiert, verwende diesen als Label + if comment: + label = comment + + schema[field] = { + "type": data_type, + "label": label + } + + return schema + except Exception as e: + logger.error(f"Fehler beim Abrufen des Schemas für Tabelle {table}: {e}") + return {} + + def get_recordset(self, table: str, field_filter: List[str] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]: + """ + Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien. + + Args: + table: Name der Tabelle + field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen) + record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen) + + Returns: + Liste der gefilterten Datensätze + """ + # Bestimme die Felder für die Abfrage + fields = "*" + if field_filter and isinstance(field_filter, list): + fields = ", ".join(field_filter) + + # Basisbedingung ist der Mandantenkontext + base_where, base_params = self._get_context_filter() + + # Wende zusätzliche Filterbedingungen an, wenn vorhanden + additional_where = "" + additional_params = () + + if record_filter: + additional_where, additional_params = self._apply_record_filter(record_filter) + # Entferne das "WHERE" am Anfang und ersetze es durch "AND" + additional_where = " AND " + additional_where[6:] + + # Kombiniere die Bedingungen und Parameter + where_clause = base_where + additional_where + params = base_params + additional_params + + # Erstelle die vollständige Abfrage + query = f""" + SELECT {fields} FROM {table} {where_clause} + """ + + try: + return self._execute_select(query, params) + except Exception as e: + logger.error(f"Fehler beim Abrufen der Datensätze aus Tabelle {table}: {e}") + return [] + + def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Erstellt einen neuen Datensatz in der Tabelle. + + Args: + table: Name der Tabelle + record_data: Daten für den neuen Datensatz + + Returns: + Der erstellte Datensatz + """ + # Füge mandate_id und user_id hinzu, falls nicht vorhanden + if "mandate_id" not in record_data: + record_data["mandate_id"] = self.mandate_id + + if "user_id" not in record_data: + record_data["user_id"] = self.user_id + + # Erstelle die Abfrage + fields = ", ".join(record_data.keys()) + placeholders = ", ".join(["%s"] * len(record_data)) + values = tuple(record_data.values()) + + query = f""" + INSERT INTO {table} ({fields}) + VALUES ({placeholders}) + """ + + try: + # Führe die Abfrage aus und erhalte die ID des neuen Datensatzes + new_id = self._execute_insert(query, values) + + # Füge die ID zum Datensatz hinzu, falls eine zurückgegeben wurde + if new_id: + record_data["id"] = new_id + + return record_data + except Exception as e: + logger.error(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}: {e}") + raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}") + + def record_delete(self, table: str, record_id: Union[str, int]) -> bool: + """ + Löscht einen Datensatz aus der Tabelle. + + Args: + table: Name der Tabelle + record_id: ID des zu löschenden Datensatzes + + Returns: + True bei Erfolg, False bei Fehler + """ + # Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört + check_query = f""" + SELECT mandate_id FROM {table} WHERE id = %s + """ + + try: + result = self._execute_select(check_query, (record_id,)) + + if not result: + # Datensatz nicht gefunden + return False + + if result[0]["mandate_id"] != self.mandate_id: + raise ValueError("Not your mandate") + + # Lösche den Datensatz + delete_query = f""" + DELETE FROM {table} WHERE id = %s AND mandate_id = %s + """ + + rows_affected = self._execute_delete(delete_query, (record_id, self.mandate_id)) + + return rows_affected > 0 + except Exception as e: + logger.error(f"Fehler beim Löschen des Datensatzes aus Tabelle {table}: {e}") + return False + + def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Ändert einen Datensatz in der Tabelle. + + Args: + table: Name der Tabelle + record_id: ID des zu ändernden Datensatzes + record_data: Neue Daten für den Datensatz + + Returns: + Der aktualisierte Datensatz + """ + # Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört + check_query = f""" + SELECT mandate_id FROM {table} WHERE id = %s + """ + + try: + result = self._execute_select(check_query, (record_id,)) + + if not result: + # Datensatz nicht gefunden + raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}") + + if result[0]["mandate_id"] != self.mandate_id: + raise ValueError("Not your mandate") + + # Erstelle die SET-Klausel und Parameter für das Update + set_clauses = [] + values = [] + + for key, value in record_data.items(): + set_clauses.append(f"{key} = %s") + values.append(value) + + set_clause = ", ".join(set_clauses) + values.append(record_id) # Für die WHERE-Bedingung + values.append(self.mandate_id) # Für die mandate_id-Bedingung + + # Aktualisiere den Datensatz + update_query = f""" + UPDATE {table} + SET {set_clause} + WHERE id = %s AND mandate_id = %s + """ + + rows_affected = self._execute_update(update_query, tuple(values)) + + if rows_affected > 0: + # Lade den aktualisierten Datensatz + get_query = f""" + SELECT * FROM {table} WHERE id = %s + """ + + updated_record = self._execute_select(get_query, (record_id,)) + + if updated_record: + return updated_record[0] + else: + raise ValueError(f"Fehler beim Abrufen des aktualisierten Datensatzes aus Tabelle {table}") + else: + raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}") + except Exception as e: + logger.error(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}: {e}") + raise + + def close(self): + """Schließt die Datenbankverbindung""" + if hasattr(self, 'connection') and self.connection.is_connected(): + self.connection.close() + logger.info("Datenbankverbindung geschlossen") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f33a0e8e..8e2ea14c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,9 @@ mypy>=1.2.0 # Type checking # JSON DB Connector jsonpickle>=3.0.1 # For advanced JSON serialization +# mySQL Connector +mysql-connector-python>=8.0.23 + # === LucyDOM Interface Dependencies === # DataFrame and Data Processing