From 55b4bbaa728f060e005c1c703825aa125eb5d0c0 Mon Sep 17 00:00:00 2001
From: valueon
Date: Tue, 18 Mar 2025 23:25:42 +0100
Subject: [PATCH] connectors for database json and mysql
---
gwserver/connector_db_json.py | 7 -
gwserver/connector_db_mysql.py | 454 +++++++++++++++++++++++++++++++++
requirements.txt | 3 +
3 files changed, 457 insertions(+), 7 deletions(-)
create mode 100644 gwserver/connector_db_mysql.py
diff --git a/gwserver/connector_db_json.py b/gwserver/connector_db_json.py
index 53dd6baa..f80076a4 100644
--- a/gwserver/connector_db_json.py
+++ b/gwserver/connector_db_json.py
@@ -107,17 +107,14 @@ class JSONDatabaseConnector:
# Wenn beides existiert, filtere entsprechend
if has_mandate and has_user:
- print(" DEBUG opt m+u: record +mandate:",record["mandate_id"])
if record["mandate_id"] == self.mandate_id:
filtered_records.append(record)
# Wenn nur mandate_id existiert
elif has_mandate and not has_user:
- print(" DEBUG opt m : record +mandate:",record["mandate_id"])
if record["mandate_id"] == self.mandate_id:
filtered_records.append(record)
# Wenn weder mandate_id noch user_id existieren, füge den Datensatz hinzu
elif not has_mandate and not has_user:
- print(" DEBUG opt ---: record +")
filtered_records.append(record)
return filtered_records
@@ -135,24 +132,20 @@ class JSONDatabaseConnector:
for field, value in record_filter.items():
# Prüfen, ob das Feld existiert
if field not in record:
- print(" - field not in record")
match = False
break
# Wenn der Filterwert ein Integer-String ist und das Datensatzfeld ein Integer
if isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
if record[field] != int(value):
- print(" - ",record[field],"!=",int(value))
match = False
break
# Sonst direkter Vergleich
elif record[field] != value:
- print(" - ",record[field],"!='",value,"'")
match = False
break
if match:
- print(" + take it")
filtered_records.append(record)
return filtered_records
diff --git a/gwserver/connector_db_mysql.py b/gwserver/connector_db_mysql.py
new file mode 100644
index 00000000..c942bfc0
--- /dev/null
+++ b/gwserver/connector_db_mysql.py
@@ -0,0 +1,454 @@
+import os
+import logging
+from typing import List, Dict, Any, Optional, Union
+from datetime import datetime
+import mysql.connector
+from mysql.connector import Error
+
+
+logger = logging.getLogger(__name__)
+
+
+class MySQLDatabaseConnector:
+ """
+ Ein Konnektor für MySQL-basierte Datenspeicherung.
+ Stellt generische Datenbankoperationen bereit.
+ """
+
+ def __init__(self, db_host: str, db_name: str, db_user: str, db_password: str, mandate_id: int = None, user_id: int = None):
+ """
+ Initialisiert den MySQL-Datenbankkonnektor.
+
+ Args:
+ db_host: MySQL-Server Host
+ db_name: Name der Datenbank
+ db_user: Benutzername für die Authentifizierung
+ db_password: Passwort für die Authentifizierung
+ mandate_id: Kontext-Parameter für den Mandanten
+ user_id: Kontext-Parameter für den Benutzer
+ """
+ # Speichere die Eingabeparameter
+ self.db_host = db_host
+ self.db_name = db_name
+ self.db_user = db_user
+ self.db_password = db_password
+
+ # Prüfe, ob Kontext-Parameter gesetzt sind
+ if mandate_id is None or user_id is None:
+ raise ValueError("mandate_id und user_id müssen gesetzt sein")
+
+ self.mandate_id = mandate_id
+ self.user_id = user_id
+
+ # Stelle Verbindung zur Datenbank her
+ self.connection = self._create_connection()
+
+ logger.info(f"MySQLDatabaseConnector initialisiert für Datenbank: {db_name}")
+ logger.info(f"Kontext: mandate_id={mandate_id}, user_id={user_id}")
+
+ def _create_connection(self):
+ """Erstellt eine Verbindung zur MySQL-Datenbank"""
+ try:
+ connection = mysql.connector.connect(
+ host=self.db_host,
+ database=self.db_name,
+ user=self.db_user,
+ password=self.db_password
+ )
+ if connection.is_connected():
+ logger.info(f"Verbunden mit MySQL-Server Version {connection.get_server_info()}")
+ return connection
+ except Error as e:
+ logger.error(f"Fehler bei der Verbindung zu MySQL: {e}")
+ raise
+
+ def _execute_query(self, query: str, params: tuple = None):
+ """Führt eine SQL-Abfrage aus"""
+ cursor = None
+ try:
+ cursor = self.connection.cursor(dictionary=True)
+ cursor.execute(query, params)
+ return cursor
+ except Error as e:
+ logger.error(f"Fehler bei der Ausführung der Abfrage: {e}")
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+
+ def _execute_select(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
+ """Führt eine SELECT-Abfrage aus und gibt die Ergebnisse zurück"""
+ cursor = None
+ try:
+ cursor = self.connection.cursor(dictionary=True)
+ cursor.execute(query, params)
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ logger.error(f"Fehler bei der Ausführung der SELECT-Abfrage: {e}")
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+
+ def _execute_insert(self, query: str, params: tuple = None) -> int:
+ """Führt eine INSERT-Abfrage aus und gibt die ID des eingefügten Datensatzes zurück"""
+ cursor = None
+ try:
+ cursor = self.connection.cursor()
+ cursor.execute(query, params)
+ self.connection.commit()
+ return cursor.lastrowid
+ except Error as e:
+ logger.error(f"Fehler bei der Ausführung der INSERT-Abfrage: {e}")
+ self.connection.rollback()
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+
+ def _execute_update(self, query: str, params: tuple = None) -> int:
+ """Führt eine UPDATE-Abfrage aus und gibt die Anzahl der betroffenen Zeilen zurück"""
+ cursor = None
+ try:
+ cursor = self.connection.cursor()
+ cursor.execute(query, params)
+ self.connection.commit()
+ return cursor.rowcount
+ except Error as e:
+ logger.error(f"Fehler bei der Ausführung der UPDATE-Abfrage: {e}")
+ self.connection.rollback()
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+
+ def _execute_delete(self, query: str, params: tuple = None) -> int:
+ """Führt eine DELETE-Abfrage aus und gibt die Anzahl der gelöschten Zeilen zurück"""
+ cursor = None
+ try:
+ cursor = self.connection.cursor()
+ cursor.execute(query, params)
+ self.connection.commit()
+ return cursor.rowcount
+ except Error as e:
+ logger.error(f"Fehler bei der Ausführung der DELETE-Abfrage: {e}")
+ self.connection.rollback()
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+
+ def _apply_record_filter(self, record_filter: Dict[str, Any] = None) -> str:
+ """Erstellt eine WHERE-Klausel basierend auf dem Datensatzfilter"""
+ if not record_filter:
+ return "WHERE 1=1"
+
+ conditions = []
+ params = []
+
+ for field, value in record_filter.items():
+ conditions.append(f"{field} = %s")
+ params.append(value)
+
+ where_clause = "WHERE " + " AND ".join(conditions)
+
+ return where_clause, tuple(params)
+
+ def _get_context_filter(self) -> tuple:
+ """Erstellt eine WHERE-Klausel für den Mandanten- und Benutzerkontext"""
+ return "WHERE mandate_id = %s", (self.mandate_id,)
+
+ # Public API
+
+ def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]:
+ """
+ Gibt eine Liste aller verfügbaren Tabellen zurück.
+
+ Args:
+ filter_criteria: Optionale Filterkriterien (nicht implementiert)
+
+ Returns:
+ Liste der Tabellennamen
+ """
+ query = """
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema = %s
+ """
+
+ try:
+ result = self._execute_select(query, (self.db_name,))
+ return [row["table_name"] for row in result]
+ except Exception as e:
+ logger.error(f"Fehler beim Abrufen der Tabellen: {e}")
+ return []
+
+ def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]:
+ """
+ Gibt eine Liste aller Felder einer Tabelle zurück.
+
+ Args:
+ table: Name der Tabelle
+ filter_criteria: Optionale Filterkriterien (nicht implementiert)
+
+ Returns:
+ Liste der Feldnamen
+ """
+ query = """
+ SELECT column_name
+ FROM information_schema.columns
+ WHERE table_schema = %s AND table_name = %s
+ """
+
+ try:
+ result = self._execute_select(query, (self.db_name, table))
+ return [row["column_name"] for row in result]
+ except Exception as e:
+ logger.error(f"Fehler beim Abrufen der Felder für Tabelle {table}: {e}")
+ return []
+
+ def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]:
+ """
+ Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels.
+
+ Args:
+ table: Name der Tabelle
+ language: Sprache für die Labels (optional)
+ filter_criteria: Optionale Filterkriterien (nicht implementiert)
+
+ Returns:
+ Schema-Objekt mit Feldern, Datentypen und Labels
+ """
+ query = """
+ SELECT
+ column_name,
+ data_type,
+ column_comment
+ FROM
+ information_schema.columns
+ WHERE
+ table_schema = %s AND table_name = %s
+ """
+
+ schema = {}
+
+ try:
+ result = self._execute_select(query, (self.db_name, table))
+
+ for row in result:
+ field = row["column_name"]
+ data_type = row["data_type"]
+ comment = row["column_comment"]
+
+ # Label erstellen (Standardwert ist der Feldname)
+ label = field
+
+ # Wenn ein Kommentar existiert, verwende diesen als Label
+ if comment:
+ label = comment
+
+ schema[field] = {
+ "type": data_type,
+ "label": label
+ }
+
+ return schema
+ except Exception as e:
+ logger.error(f"Fehler beim Abrufen des Schemas für Tabelle {table}: {e}")
+ return {}
+
+ def get_recordset(self, table: str, field_filter: List[str] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
+ """
+ Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien.
+
+ Args:
+ table: Name der Tabelle
+ field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen)
+ record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen)
+
+ Returns:
+ Liste der gefilterten Datensätze
+ """
+ # Bestimme die Felder für die Abfrage
+ fields = "*"
+ if field_filter and isinstance(field_filter, list):
+ fields = ", ".join(field_filter)
+
+ # Basisbedingung ist der Mandantenkontext
+ base_where, base_params = self._get_context_filter()
+
+ # Wende zusätzliche Filterbedingungen an, wenn vorhanden
+ additional_where = ""
+ additional_params = ()
+
+ if record_filter:
+ additional_where, additional_params = self._apply_record_filter(record_filter)
+ # Entferne das "WHERE" am Anfang und ersetze es durch "AND"
+ additional_where = " AND " + additional_where[6:]
+
+ # Kombiniere die Bedingungen und Parameter
+ where_clause = base_where + additional_where
+ params = base_params + additional_params
+
+ # Erstelle die vollständige Abfrage
+ query = f"""
+ SELECT {fields} FROM {table} {where_clause}
+ """
+
+ try:
+ return self._execute_select(query, params)
+ except Exception as e:
+ logger.error(f"Fehler beim Abrufen der Datensätze aus Tabelle {table}: {e}")
+ return []
+
+ def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Erstellt einen neuen Datensatz in der Tabelle.
+
+ Args:
+ table: Name der Tabelle
+ record_data: Daten für den neuen Datensatz
+
+ Returns:
+ Der erstellte Datensatz
+ """
+ # Füge mandate_id und user_id hinzu, falls nicht vorhanden
+ if "mandate_id" not in record_data:
+ record_data["mandate_id"] = self.mandate_id
+
+ if "user_id" not in record_data:
+ record_data["user_id"] = self.user_id
+
+ # Erstelle die Abfrage
+ fields = ", ".join(record_data.keys())
+ placeholders = ", ".join(["%s"] * len(record_data))
+ values = tuple(record_data.values())
+
+ query = f"""
+ INSERT INTO {table} ({fields})
+ VALUES ({placeholders})
+ """
+
+ try:
+ # Führe die Abfrage aus und erhalte die ID des neuen Datensatzes
+ new_id = self._execute_insert(query, values)
+
+ # Füge die ID zum Datensatz hinzu, falls eine zurückgegeben wurde
+ if new_id:
+ record_data["id"] = new_id
+
+ return record_data
+ except Exception as e:
+ logger.error(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}: {e}")
+ raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}")
+
+ def record_delete(self, table: str, record_id: Union[str, int]) -> bool:
+ """
+ Löscht einen Datensatz aus der Tabelle.
+
+ Args:
+ table: Name der Tabelle
+ record_id: ID des zu löschenden Datensatzes
+
+ Returns:
+ True bei Erfolg, False bei Fehler
+ """
+ # Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört
+ check_query = f"""
+ SELECT mandate_id FROM {table} WHERE id = %s
+ """
+
+ try:
+ result = self._execute_select(check_query, (record_id,))
+
+ if not result:
+ # Datensatz nicht gefunden
+ return False
+
+ if result[0]["mandate_id"] != self.mandate_id:
+ raise ValueError("Not your mandate")
+
+ # Lösche den Datensatz
+ delete_query = f"""
+ DELETE FROM {table} WHERE id = %s AND mandate_id = %s
+ """
+
+ rows_affected = self._execute_delete(delete_query, (record_id, self.mandate_id))
+
+ return rows_affected > 0
+ except Exception as e:
+ logger.error(f"Fehler beim Löschen des Datensatzes aus Tabelle {table}: {e}")
+ return False
+
+ def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Ändert einen Datensatz in der Tabelle.
+
+ Args:
+ table: Name der Tabelle
+ record_id: ID des zu ändernden Datensatzes
+ record_data: Neue Daten für den Datensatz
+
+ Returns:
+ Der aktualisierte Datensatz
+ """
+ # Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört
+ check_query = f"""
+ SELECT mandate_id FROM {table} WHERE id = %s
+ """
+
+ try:
+ result = self._execute_select(check_query, (record_id,))
+
+ if not result:
+ # Datensatz nicht gefunden
+ raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}")
+
+ if result[0]["mandate_id"] != self.mandate_id:
+ raise ValueError("Not your mandate")
+
+ # Erstelle die SET-Klausel und Parameter für das Update
+ set_clauses = []
+ values = []
+
+ for key, value in record_data.items():
+ set_clauses.append(f"{key} = %s")
+ values.append(value)
+
+ set_clause = ", ".join(set_clauses)
+ values.append(record_id) # Für die WHERE-Bedingung
+ values.append(self.mandate_id) # Für die mandate_id-Bedingung
+
+ # Aktualisiere den Datensatz
+ update_query = f"""
+ UPDATE {table}
+ SET {set_clause}
+ WHERE id = %s AND mandate_id = %s
+ """
+
+ rows_affected = self._execute_update(update_query, tuple(values))
+
+ if rows_affected > 0:
+ # Lade den aktualisierten Datensatz
+ get_query = f"""
+ SELECT * FROM {table} WHERE id = %s
+ """
+
+ updated_record = self._execute_select(get_query, (record_id,))
+
+ if updated_record:
+ return updated_record[0]
+ else:
+ raise ValueError(f"Fehler beim Abrufen des aktualisierten Datensatzes aus Tabelle {table}")
+ else:
+ raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}")
+ except Exception as e:
+ logger.error(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}: {e}")
+ raise
+
+ def close(self):
+ """Schließt die Datenbankverbindung"""
+ if hasattr(self, 'connection') and self.connection.is_connected():
+ self.connection.close()
+ logger.info("Datenbankverbindung geschlossen")
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index f33a0e8e..8e2ea14c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,6 +35,9 @@ mypy>=1.2.0 # Type checking
# JSON DB Connector
jsonpickle>=3.0.1 # For advanced JSON serialization
+# mySQL Connector
+mysql-connector-python>=8.0.23
+
# === LucyDOM Interface Dependencies ===
# DataFrame and Data Processing