675 lines
No EOL
25 KiB
Python
675 lines
No EOL
25 KiB
Python
import os
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Union
|
|
from datetime import datetime
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DatabaseConnector:
|
|
"""
|
|
Ein Konnektor für MySQL-basierte Datenspeicherung.
|
|
Stellt generische Datenbankoperationen bereit.
|
|
"""
|
|
|
|
def __init__(self, db_host: str, db_database: str, db_user: str, db_password: str, mandate_id: int = None, user_id: int = None):
|
|
"""
|
|
Initialisiert den MySQL-Datenbankkonnektor.
|
|
|
|
Args:
|
|
db_host: MySQL-Server Host
|
|
db_database: Name der Datenbank
|
|
db_user: Benutzername für die Authentifizierung
|
|
db_password: Passwort für die Authentifizierung
|
|
mandate_id: Kontext-Parameter für den Mandanten
|
|
user_id: Kontext-Parameter für den Benutzer
|
|
"""
|
|
# Speichere die Eingabeparameter
|
|
self.db_host = db_host
|
|
self.db_database = db_database
|
|
self.db_user = db_user
|
|
self.db_password = db_password
|
|
|
|
# Prüfe, ob Kontext-Parameter gesetzt sind
|
|
if mandate_id is None or user_id is None:
|
|
raise ValueError("mandate_id und user_id müssen gesetzt sein")
|
|
|
|
# Stelle Verbindung zur Datenbank her
|
|
self.connection = self._create_connection()
|
|
|
|
# System-Tabelle initialisieren
|
|
self._system_table_name = "_system"
|
|
self._initialize_system_table()
|
|
|
|
# Temporär mandate_id und user_id speichern
|
|
self._mandate_id = mandate_id
|
|
self._user_id = user_id
|
|
|
|
# Wenn mandate_id oder user_id 0 sind, versuche die initialen IDs zu verwenden
|
|
if mandate_id == 0:
|
|
initial_mandate_id = self.get_initial_id("mandates")
|
|
if initial_mandate_id is not None:
|
|
self._mandate_id = initial_mandate_id
|
|
logger.info(f"Verwende initiale mandate_id: {initial_mandate_id} statt 0")
|
|
|
|
if user_id == 0:
|
|
initial_user_id = self.get_initial_id("users")
|
|
if initial_user_id is not None:
|
|
self._user_id = initial_user_id
|
|
logger.info(f"Verwende initiale user_id: {initial_user_id} statt 0")
|
|
|
|
# Setze die effektiven IDs als Eigenschaften
|
|
self.mandate_id = self._mandate_id
|
|
self.user_id = self._user_id
|
|
|
|
logger.info(f"DatabaseConnector initialisiert für Datenbank: {db_database}")
|
|
logger.info(f"Kontext: mandate_id={self.mandate_id}, user_id={self.user_id}")
|
|
|
|
def _create_connection(self):
|
|
"""Erstellt eine Verbindung zur MySQL-Datenbank"""
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=self.db_host,
|
|
database=self.db_database,
|
|
user=self.db_user,
|
|
password=self.db_password
|
|
)
|
|
if connection.is_connected():
|
|
logger.info(f"Verbunden mit MySQL-Server Version {connection.get_server_info()}")
|
|
return connection
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Verbindung zu MySQL: {e}")
|
|
raise
|
|
|
|
def _initialize_system_table(self):
|
|
"""Initialisiert die System-Tabelle, falls sie noch nicht existiert."""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
# Prüfe, ob die System-Tabelle existiert
|
|
cursor.execute(f"""
|
|
SELECT COUNT(*)
|
|
FROM information_schema.tables
|
|
WHERE table_schema = '{self.db_database}'
|
|
AND table_name = '{self._system_table_name}'
|
|
""")
|
|
|
|
if cursor.fetchone()[0] == 0:
|
|
# Erstelle die System-Tabelle
|
|
cursor.execute(f"""
|
|
CREATE TABLE {self._system_table_name} (
|
|
table_name VARCHAR(255) PRIMARY KEY,
|
|
initial_id INT NOT NULL
|
|
)
|
|
""")
|
|
self.connection.commit()
|
|
logger.info(f"System-Tabelle '{self._system_table_name}' erstellt")
|
|
except Error as e:
|
|
logger.error(f"Fehler beim Initialisieren der System-Tabelle: {e}")
|
|
if self.connection.is_connected():
|
|
self.connection.rollback()
|
|
raise
|
|
finally:
|
|
if cursor and cursor.is_connected():
|
|
cursor.close()
|
|
|
|
def _execute_query(self, query: str, params: tuple = None):
|
|
"""Führt eine SQL-Abfrage aus"""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor(dictionary=True)
|
|
cursor.execute(query, params)
|
|
return cursor
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Ausführung der Abfrage: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def _execute_select(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
|
|
"""Führt eine SELECT-Abfrage aus und gibt die Ergebnisse zurück"""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor(dictionary=True)
|
|
cursor.execute(query, params)
|
|
result = cursor.fetchall()
|
|
return result
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Ausführung der SELECT-Abfrage: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def _execute_insert(self, query: str, params: tuple = None) -> int:
|
|
"""Führt eine INSERT-Abfrage aus und gibt die ID des eingefügten Datensatzes zurück"""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(query, params)
|
|
self.connection.commit()
|
|
return cursor.lastrowid
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Ausführung der INSERT-Abfrage: {e}")
|
|
self.connection.rollback()
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def _execute_update(self, query: str, params: tuple = None) -> int:
|
|
"""Führt eine UPDATE-Abfrage aus und gibt die Anzahl der betroffenen Zeilen zurück"""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(query, params)
|
|
self.connection.commit()
|
|
return cursor.rowcount
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Ausführung der UPDATE-Abfrage: {e}")
|
|
self.connection.rollback()
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def _execute_delete(self, query: str, params: tuple = None) -> int:
|
|
"""Führt eine DELETE-Abfrage aus und gibt die Anzahl der gelöschten Zeilen zurück"""
|
|
cursor = None
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(query, params)
|
|
self.connection.commit()
|
|
return cursor.rowcount
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Ausführung der DELETE-Abfrage: {e}")
|
|
self.connection.rollback()
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def _apply_record_filter(self, record_filter: Dict[str, Any] = None) -> str:
|
|
"""Erstellt eine WHERE-Klausel basierend auf dem Datensatzfilter"""
|
|
if not record_filter:
|
|
return "WHERE 1=1"
|
|
|
|
conditions = []
|
|
params = []
|
|
|
|
for field, value in record_filter.items():
|
|
conditions.append(f"{field} = %s")
|
|
params.append(value)
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions)
|
|
|
|
return where_clause, tuple(params)
|
|
|
|
def _get_context_filter(self) -> tuple:
|
|
"""Erstellt eine WHERE-Klausel für den Mandanten- und Benutzerkontext"""
|
|
return "WHERE mandate_id = %s", (self.mandate_id,)
|
|
|
|
# Public API
|
|
|
|
def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]:
|
|
"""
|
|
Gibt eine Liste aller verfügbaren Tabellen zurück.
|
|
|
|
Args:
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Liste der Tabellennamen
|
|
"""
|
|
query = """
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = %s
|
|
AND table_name NOT LIKE '\_%'
|
|
"""
|
|
|
|
try:
|
|
result = self._execute_select(query, (self.db_database,))
|
|
return [row["table_name"] for row in result]
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der Tabellen: {e}")
|
|
return []
|
|
|
|
def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]:
|
|
"""
|
|
Gibt eine Liste aller Felder einer Tabelle zurück.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Liste der Feldnamen
|
|
"""
|
|
query = """
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s
|
|
"""
|
|
|
|
try:
|
|
result = self._execute_select(query, (self.db_database, table))
|
|
return [row["column_name"] for row in result]
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der Felder für Tabelle {table}: {e}")
|
|
return []
|
|
|
|
def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
language: Sprache für die Labels (optional)
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Schema-Objekt mit Feldern, Datentypen und Labels
|
|
"""
|
|
query = """
|
|
SELECT
|
|
column_name,
|
|
data_type,
|
|
column_comment
|
|
FROM
|
|
information_schema.columns
|
|
WHERE
|
|
table_schema = %s AND table_name = %s
|
|
"""
|
|
|
|
schema = {}
|
|
|
|
try:
|
|
result = self._execute_select(query, (self.db_database, table))
|
|
|
|
for row in result:
|
|
field = row["column_name"]
|
|
data_type = row["data_type"]
|
|
comment = row["column_comment"]
|
|
|
|
# Label erstellen (Standardwert ist der Feldname)
|
|
label = field
|
|
|
|
# Wenn ein Kommentar existiert, verwende diesen als Label
|
|
if comment:
|
|
label = comment
|
|
|
|
schema[field] = {
|
|
"type": data_type,
|
|
"label": label
|
|
}
|
|
|
|
return schema
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen des Schemas für Tabelle {table}: {e}")
|
|
return {}
|
|
|
|
def get_recordset(self, table: str, field_filter: List[str] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen)
|
|
record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen)
|
|
|
|
Returns:
|
|
Liste der gefilterten Datensätze
|
|
"""
|
|
# Bestimme die Felder für die Abfrage
|
|
fields = "*"
|
|
if field_filter and isinstance(field_filter, list):
|
|
fields = ", ".join(field_filter)
|
|
|
|
# Basisbedingung ist der Mandantenkontext
|
|
base_where, base_params = self._get_context_filter()
|
|
|
|
# Wende zusätzliche Filterbedingungen an, wenn vorhanden
|
|
additional_where = ""
|
|
additional_params = ()
|
|
|
|
if record_filter:
|
|
additional_where, additional_params = self._apply_record_filter(record_filter)
|
|
# Entferne das "WHERE" am Anfang und ersetze es durch "AND"
|
|
additional_where = " AND " + additional_where[6:]
|
|
|
|
# Kombiniere die Bedingungen und Parameter
|
|
where_clause = base_where + additional_where
|
|
params = base_params + additional_params
|
|
|
|
# Erstelle die vollständige Abfrage
|
|
query = f"""
|
|
SELECT {fields} FROM {table} {where_clause}
|
|
"""
|
|
|
|
try:
|
|
return self._execute_select(query, params)
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der Datensätze aus Tabelle {table}: {e}")
|
|
return []
|
|
|
|
def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Erstellt einen neuen Datensatz in der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_data: Daten für den neuen Datensatz
|
|
|
|
Returns:
|
|
Der erstellte Datensatz
|
|
"""
|
|
# Füge mandate_id und user_id hinzu, falls nicht vorhanden oder 0
|
|
if "mandate_id" not in record_data or record_data["mandate_id"] == 0:
|
|
record_data["mandate_id"] = self.mandate_id
|
|
|
|
if "user_id" not in record_data or record_data["user_id"] == 0:
|
|
record_data["user_id"] = self.user_id
|
|
|
|
# Erstelle die Abfrage
|
|
fields = ", ".join(record_data.keys())
|
|
placeholders = ", ".join(["%s"] * len(record_data))
|
|
values = tuple(record_data.values())
|
|
|
|
query = f"""
|
|
INSERT INTO {table} ({fields})
|
|
VALUES ({placeholders})
|
|
"""
|
|
|
|
try:
|
|
# Prüfe zuerst, ob die Tabelle leer ist
|
|
check_query = f"""
|
|
SELECT COUNT(*) as count FROM {table}
|
|
"""
|
|
count_result = self._execute_select(check_query)
|
|
is_empty = count_result[0]["count"] == 0
|
|
|
|
# Führe die Abfrage aus und erhalte die ID des neuen Datensatzes
|
|
new_id = self._execute_insert(query, values)
|
|
|
|
# Wenn die Tabelle vorher leer war, registriere die neue ID als initiale ID
|
|
if is_empty and new_id:
|
|
self.register_initial_id(table, new_id)
|
|
logger.info(f"Initiale ID {new_id} für Tabelle {table} registriert")
|
|
|
|
# Füge die ID zum Datensatz hinzu, falls eine zurückgegeben wurde
|
|
if new_id:
|
|
record_data["id"] = new_id
|
|
|
|
return record_data
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}: {e}")
|
|
raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}")
|
|
|
|
def record_delete(self, table: str, record_id: Union[str, int]) -> bool:
|
|
"""
|
|
Löscht einen Datensatz aus der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_id: ID des zu löschenden Datensatzes
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
# Prüfe, ob es sich um die initiale ID handelt
|
|
initial_id = self.get_initial_id(table)
|
|
if initial_id is not None and initial_id == record_id:
|
|
logger.warning(f"Versuch, den initialen Datensatz mit ID {record_id} aus Tabelle {table} zu löschen, wurde verhindert")
|
|
return False
|
|
|
|
# Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört
|
|
check_query = f"""
|
|
SELECT mandate_id FROM {table} WHERE id = %s
|
|
"""
|
|
|
|
try:
|
|
result = self._execute_select(check_query, (record_id,))
|
|
|
|
if not result:
|
|
# Datensatz nicht gefunden
|
|
return False
|
|
|
|
if result[0]["mandate_id"] != self.mandate_id:
|
|
raise ValueError("Not your mandate")
|
|
|
|
# Lösche den Datensatz
|
|
delete_query = f"""
|
|
DELETE FROM {table} WHERE id = %s AND mandate_id = %s
|
|
"""
|
|
|
|
rows_affected = self._execute_delete(delete_query, (record_id, self.mandate_id))
|
|
|
|
return rows_affected > 0
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Löschen des Datensatzes aus Tabelle {table}: {e}")
|
|
return False
|
|
|
|
def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Ändert einen Datensatz in der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_id: ID des zu ändernden Datensatzes
|
|
record_data: Neue Daten für den Datensatz
|
|
|
|
Returns:
|
|
Der aktualisierte Datensatz
|
|
"""
|
|
# Prüfe, ob es sich um die initiale ID handelt und die ID geändert werden soll
|
|
initial_id = self.get_initial_id(table)
|
|
if initial_id is not None and initial_id == record_id and "id" in record_data and record_data["id"] != record_id:
|
|
raise ValueError(f"Die ID des initialen Datensatzes in Tabelle {table} kann nicht geändert werden")
|
|
|
|
# Prüfe zuerst, ob der Datensatz zum aktuellen Mandanten gehört
|
|
check_query = f"""
|
|
SELECT mandate_id FROM {table} WHERE id = %s
|
|
"""
|
|
|
|
try:
|
|
result = self._execute_select(check_query, (record_id,))
|
|
|
|
if not result:
|
|
# Datensatz nicht gefunden
|
|
raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}")
|
|
|
|
if result[0]["mandate_id"] != self.mandate_id:
|
|
raise ValueError("Not your mandate")
|
|
|
|
# Erstelle die SET-Klausel und Parameter für das Update
|
|
set_clauses = []
|
|
values = []
|
|
|
|
for key, value in record_data.items():
|
|
set_clauses.append(f"{key} = %s")
|
|
values.append(value)
|
|
|
|
set_clause = ", ".join(set_clauses)
|
|
values.append(record_id) # Für die WHERE-Bedingung
|
|
values.append(self.mandate_id) # Für die mandate_id-Bedingung
|
|
|
|
# Aktualisiere den Datensatz
|
|
update_query = f"""
|
|
UPDATE {table}
|
|
SET {set_clause}
|
|
WHERE id = %s AND mandate_id = %s
|
|
"""
|
|
|
|
rows_affected = self._execute_update(update_query, tuple(values))
|
|
|
|
if rows_affected > 0:
|
|
# Lade den aktualisierten Datensatz
|
|
get_query = f"""
|
|
SELECT * FROM {table} WHERE id = %s
|
|
"""
|
|
|
|
updated_record = self._execute_select(get_query, (record_id,))
|
|
|
|
if updated_record:
|
|
return updated_record[0]
|
|
else:
|
|
raise ValueError(f"Fehler beim Abrufen des aktualisierten Datensatzes aus Tabelle {table}")
|
|
else:
|
|
raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}")
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}: {e}")
|
|
raise
|
|
|
|
# System-Tabellen-Funktionen
|
|
|
|
def register_initial_id(self, table: str, initial_id: int) -> bool:
|
|
"""
|
|
Registriert die initiale ID für eine Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
initial_id: Die initiale ID
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
# Prüfe zuerst, ob bereits eine initiale ID für diese Tabelle registriert ist
|
|
check_query = f"""
|
|
SELECT COUNT(*) as count
|
|
FROM {self._system_table_name}
|
|
WHERE table_name = %s
|
|
"""
|
|
|
|
result = self._execute_select(check_query, (table,))
|
|
|
|
if result and result[0]["count"] > 0:
|
|
# Bereits registriert
|
|
return True
|
|
|
|
# Registriere die initiale ID
|
|
insert_query = f"""
|
|
INSERT INTO {self._system_table_name} (table_name, initial_id)
|
|
VALUES (%s, %s)
|
|
"""
|
|
|
|
self._execute_insert(insert_query, (table, initial_id))
|
|
logger.info(f"Initiale ID {initial_id} für Tabelle {table} registriert")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Registrieren der initialen ID für Tabelle {table}: {e}")
|
|
return False
|
|
|
|
def get_initial_id(self, table: str) -> Optional[int]:
|
|
"""
|
|
Gibt die initiale ID für eine Tabelle zurück.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
|
|
Returns:
|
|
Die initiale ID oder None, wenn nicht vorhanden
|
|
"""
|
|
try:
|
|
query = f"""
|
|
SELECT initial_id
|
|
FROM {self._system_table_name}
|
|
WHERE table_name = %s
|
|
"""
|
|
|
|
result = self._execute_select(query, (table,))
|
|
|
|
if result and len(result) > 0:
|
|
logger.info(f"Gefundene initiale ID für Tabelle {table}: {result[0]['initial_id']}")
|
|
return result[0]["initial_id"]
|
|
|
|
# Wenn keine initiale ID gefunden wurde, versuche den ersten Datensatz zu verwenden
|
|
if table and not table.startswith("_"):
|
|
try:
|
|
query = f"""
|
|
SELECT id
|
|
FROM {table}
|
|
ORDER BY id
|
|
LIMIT 1
|
|
"""
|
|
|
|
first_record = self._execute_select(query)
|
|
|
|
if first_record and len(first_record) > 0 and "id" in first_record[0]:
|
|
first_id = first_record[0]["id"]
|
|
# Registriere diese ID als initiale ID
|
|
self.register_initial_id(table, first_id)
|
|
logger.info(f"Automatisch erkannte initiale ID {first_id} für Tabelle {table}")
|
|
return first_id
|
|
except Exception as inner_e:
|
|
logger.warning(f"Konnte keinen ersten Datensatz in Tabelle {table} finden: {inner_e}")
|
|
|
|
logger.debug(f"Keine initiale ID für Tabelle {table} gefunden")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der initialen ID für Tabelle {table}: {e}")
|
|
return None
|
|
|
|
def has_initial_id(self, table: str) -> bool:
|
|
"""
|
|
Prüft, ob eine initiale ID für eine Tabelle registriert ist.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
|
|
Returns:
|
|
True, wenn eine initiale ID registriert ist, sonst False
|
|
"""
|
|
try:
|
|
query = f"""
|
|
SELECT COUNT(*) as count
|
|
FROM {self._system_table_name}
|
|
WHERE table_name = %s
|
|
"""
|
|
|
|
result = self._execute_select(query, (table,))
|
|
|
|
if result and len(result) > 0:
|
|
return result[0]["count"] > 0
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Prüfen der initialen ID für Tabelle {table}: {e}")
|
|
return False
|
|
|
|
def get_all_initial_ids(self) -> Dict[str, int]:
|
|
"""
|
|
Gibt alle registrierten initialen IDs zurück.
|
|
|
|
Returns:
|
|
Dictionary mit Tabellennamen als Schlüssel und initialen IDs als Werte
|
|
"""
|
|
try:
|
|
query = f"""
|
|
SELECT table_name, initial_id
|
|
FROM {self._system_table_name}
|
|
"""
|
|
|
|
result = self._execute_select(query)
|
|
|
|
initial_ids = {}
|
|
for row in result:
|
|
initial_ids[row["table_name"]] = row["initial_id"]
|
|
|
|
return initial_ids
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen aller initialen IDs: {e}")
|
|
return {}
|
|
|
|
def close(self):
|
|
"""Schließt die Datenbankverbindung"""
|
|
if hasattr(self, 'connection') and self.connection.is_connected():
|
|
self.connection.close()
|
|
logger.info("Datenbankverbindung geschlossen") |