import json import os from typing import List, Dict, Any, Optional, Union import logging from datetime import datetime logger = logging.getLogger(__name__) class DatabaseConnector: """ Ein Konnektor für JSON-basierte Datenspeicherung. Stellt generische Datenbankoperationen bereit. """ def __init__(self, db_host: str, db_database: str, db_user: str = None, db_password: str = None, mandate_id: int = None, user_id: int = None): """ Initialisiert den JSON-Datenbankkonnektor. Args: db_host: Verzeichnis für die JSON-Dateien db_database = Datenbankname db_user: Benutzername für die Authentifizierung (optional) db_password: API-Schlüssel für die Authentifizierung (optional) mandate_id: Kontext-Parameter für den Mandanten user_id: Kontext-Parameter für den Benutzer """ # Speichere die Eingabeparameter self.db_host = db_host self.db_database = db_database self.db_user = db_user self.db_password = db_password # Prüfe, ob Kontext-Parameter gesetzt sind if mandate_id is None or user_id is None: raise ValueError("mandate_id und user_id müssen gesetzt sein") # Stelle sicher, dass das Datenbankverzeichnis existiert self.db_folder=os.path.join(self.db_host,self.db_database) os.makedirs(self.db_folder, exist_ok=True) # Cache für geladene Daten self._tables_cache = {} # System-Tabelle initialisieren self._system_table_name = "_system" self._initialize_system_table() # Temporär mandate_id und user_id speichern self._mandate_id = mandate_id self._user_id = user_id # Wenn mandate_id oder user_id 0 sind, versuche die initialen IDs zu verwenden if mandate_id == 0: initial_mandate_id = self.get_initial_id("mandates") if initial_mandate_id is not None: self._mandate_id = initial_mandate_id logger.info(f"Verwende initiale mandate_id: {initial_mandate_id} statt 0") if user_id == 0: initial_user_id = self.get_initial_id("users") if initial_user_id is not None: self._user_id = initial_user_id logger.info(f"Verwende initiale user_id: {initial_user_id} statt 0") # Setze die effektiven IDs als Eigenschaften self.mandate_id = self._mandate_id self.user_id = self._user_id logger.info(f"DatabaseConnector initialisiert für Verzeichnis: {self.db_folder}") logger.debug(f"Kontext: mandate_id={self.mandate_id}, user_id={self.user_id}") def _initialize_system_table(self): """Initialisiert die System-Tabelle, falls sie noch nicht existiert.""" system_table_path = self._get_table_path(self._system_table_name) if not os.path.exists(system_table_path): empty_system_table = {} self._save_system_table(empty_system_table) logger.info(f"System-Tabelle initialisiert in {system_table_path}") def _load_system_table(self) -> Dict[str, int]: """Lädt die System-Tabelle mit den initialen IDs.""" system_table_path = self._get_table_path(self._system_table_name) try: if os.path.exists(system_table_path): with open(system_table_path, 'r', encoding='utf-8') as f: return json.load(f) else: return {} except Exception as e: logger.error(f"Fehler beim Laden der System-Tabelle: {e}") return {} def _save_system_table(self, data: Dict[str, int]) -> bool: """Speichert die System-Tabelle mit den initialen IDs.""" system_table_path = self._get_table_path(self._system_table_name) try: with open(system_table_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except Exception as e: logger.error(f"Fehler beim Speichern der System-Tabelle: {e}") return False def _get_table_path(self, table: str) -> str: """Gibt den vollständigen Pfad zu einer Tabellendatei zurück""" return os.path.join(self.db_folder, f"{table}.json") def _load_table(self, table: str) -> List[Dict[str, Any]]: """Lädt eine Tabelle aus der entsprechenden JSON-Datei""" path = self._get_table_path(table) # Wenn die Tabelle die System-Tabelle ist, lade sie direkt if table == self._system_table_name: return [] # Die System-Tabelle wird nicht wie normale Tabellen behandelt # Wenn die Tabelle bereits im Cache ist, verwende den Cache if table in self._tables_cache: # logger.info(f"Lade Tabelle {table} aus Cache") return self._tables_cache[table] # Ansonsten lade die Datei try: if os.path.exists(path): # logger.info(f"Lade Tabelle {table} aus JSON {path}") with open(path, 'r', encoding='utf-8') as f: data = json.load(f) self._tables_cache[table] = data # Wenn Daten geladen wurden und noch keine initiale ID registriert ist, # registriere die ID des ersten Datensatzes (falls vorhanden) if data and not self.has_initial_id(table): if "id" in data[0]: self.register_initial_id(table, data[0]["id"]) logger.info(f"Initiale ID {data[0]['id']} für Tabelle {table} nachträglich registriert") return data else: # Wenn die Datei nicht existiert, erstelle eine leere Tabelle logger.info(f"Neue Tabelle {table}") self._tables_cache[table] = [] self._save_table(table, []) return [] except Exception as e: logger.error(f"Fehler beim Laden der Tabelle {table}: {e}") return [] def _save_table(self, table: str, data: List[Dict[str, Any]]) -> bool: """Speichert eine Tabelle in der entsprechenden JSON-Datei""" # Die System-Tabelle wird speziell behandelt if table == self._system_table_name: return False path = self._get_table_path(table) try: with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Aktualisiere den Cache self._tables_cache[table] = data return True except Exception as e: logger.error(f"Fehler beim Speichern der Tabelle {table}: {e}") return False def _filter_by_context(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Filtert Datensätze nach dem Mandanten- und Benutzerkontext, sofern diese Felder im Datensatz existieren. """ filtered_records = [] for record in records: # Prüfe, ob mandate_id im Datensatz existiert und nicht null ist has_mandate = "mandate_id" in record and record["mandate_id"] is not None and record["mandate_id"] != "" # Prüfe, ob user_id im Datensatz existiert und nicht null ist has_user = "user_id" in record and record["user_id"] is not None and record["user_id"] != "" # Wenn beides existiert, filtere entsprechend if has_mandate and has_user: if record["mandate_id"] == self.mandate_id: filtered_records.append(record) # Wenn nur mandate_id existiert elif has_mandate and not has_user: if record["mandate_id"] == self.mandate_id: filtered_records.append(record) # Wenn weder mandate_id noch user_id existieren, füge den Datensatz hinzu elif not has_mandate and not has_user: filtered_records.append(record) return filtered_records def _apply_record_filter(self, records: List[Dict[str, Any]], record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Wendet einen Datensatzfilter auf die Datensätze an""" if not record_filter: return records filtered_records = [] for record in records: match = True for field, value in record_filter.items(): # Prüfen, ob das Feld existiert if field not in record: match = False break # Wenn der Filterwert ein Integer-String ist und das Datensatzfeld ein Integer if isinstance(value, str) and value.isdigit() and isinstance(record[field], int): if record[field] != int(value): match = False break # Sonst direkter Vergleich elif record[field] != value: match = False break if match: filtered_records.append(record) return filtered_records # Public API def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]: """ Gibt eine Liste aller verfügbaren Tabellen zurück. Args: filter_criteria: Optionale Filterkriterien (nicht implementiert) Returns: Liste der Tabellennamen """ tables = [] try: for filename in os.listdir(self.db_folder): if filename.endswith('.json') and not filename.startswith('_'): table_name = filename[:-5] # Entferne die .json-Endung tables.append(table_name) except Exception as e: logger.error(f"Fehler beim Lesen des Datenbankverzeichnisses: {e}") return tables def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]: """ Gibt eine Liste aller Felder einer Tabelle zurück. Args: table: Name der Tabelle filter_criteria: Optionale Filterkriterien (nicht implementiert) Returns: Liste der Feldnamen """ # Lade die Tabellendaten data = self._load_table(table) if not data: return [] # Nehme den ersten Datensatz als Referenz für die Felder fields = list(data[0].keys()) if data else [] return fields def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]: """ Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels. Args: table: Name der Tabelle language: Sprache für die Labels (optional) filter_criteria: Optionale Filterkriterien (nicht implementiert) Returns: Schema-Objekt mit Feldern, Datentypen und Labels """ # Lade die Tabellendaten data = self._load_table(table) schema = {} if not data: return schema # Nehme den ersten Datensatz als Referenz für die Felder und Datentypen first_record = data[0] for field, value in first_record.items(): # Bestimme den Datentyp data_type = type(value).__name__ # Label erstellen (Standardwert ist der Feldname) label = field # Wenn model_info verfügbar ist, versuche das Label aus dem Modell zu holen # Implementierung hängt vom tatsächlichen Modell ab schema[field] = { "type": data_type, "label": label } return schema def get_recordset(self, table: str, field_filter: Dict[str, Any] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """ Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien. Args: table: Name der Tabelle field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen) record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen) Returns: Liste der gefilterten Datensätze """ # Lade die Tabellendaten data = self._load_table(table) # Filtere nach Mandanten- und Benutzerkontext filtered_data = self._filter_by_context(data) # Wende record_filter an, wenn vorhanden if record_filter: filtered_data = self._apply_record_filter(filtered_data, record_filter) # Wenn field_filter vorhanden ist, reduziere die Felder if field_filter and isinstance(field_filter, list): result = [] for record in filtered_data: filtered_record = {} for field in field_filter: if field in record: filtered_record[field] = record[field] result.append(filtered_record) return result return filtered_data def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]: """ Erstellt einen neuen Datensatz in der Tabelle. Args: table: Name der Tabelle record_data: Daten für den neuen Datensatz Returns: Der erstellte Datensatz """ # Lade die Tabellendaten data = self._load_table(table) # Füge mandate_id und user_id hinzu, falls nicht vorhanden oder 0 if "mandate_id" not in record_data or record_data["mandate_id"] == 0: record_data["mandate_id"] = self.mandate_id if "user_id" not in record_data or record_data["user_id"] == 0: record_data["user_id"] = self.user_id # Bestimme die nächste ID, falls nicht vorhanden if "id" not in record_data: next_id = 1 if data: next_id = max(record["id"] for record in data if "id" in record) + 1 record_data["id"] = next_id # Wenn die Tabelle leer ist und eine System-ID registriert werden soll if not data: self.register_initial_id(table, record_data["id"]) logger.info(f"Initiale ID {record_data['id']} für Tabelle {table} registriert") # Füge den neuen Datensatz hinzu data.append(record_data) # Speichere die aktualisierte Tabelle if self._save_table(table, data): return record_data else: raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}") def record_delete(self, table: str, record_id: Union[str, int]) -> bool: """ Löscht einen Datensatz aus der Tabelle. Args: table: Name der Tabelle record_id: ID des zu löschenden Datensatzes Returns: True bei Erfolg, False bei Fehler """ # Lade die Tabellendaten data = self._load_table(table) # Prüfe, ob es sich um die initiale ID handelt initial_id = self.get_initial_id(table) if initial_id is not None and initial_id == record_id: logger.warning(f"Versuch, den initialen Datensatz mit ID {record_id} aus Tabelle {table} zu löschen, wurde verhindert") return False # Suche den Datensatz for i, record in enumerate(data): if "id" in record and record["id"] == record_id: # Prüfe, ob der Datensatz zum aktuellen Mandanten gehört if "mandate_id" in record and record["mandate_id"] != self.mandate_id: raise ValueError("Not your mandate") # Lösche den Datensatz del data[i] # Speichere die aktualisierte Tabelle return self._save_table(table, data) # Datensatz nicht gefunden return False def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]: """ Ändert einen Datensatz in der Tabelle. Args: table: Name der Tabelle record_id: ID des zu ändernden Datensatzes record_data: Neue Daten für den Datensatz Returns: Der aktualisierte Datensatz """ # Lade die Tabellendaten data = self._load_table(table) # Suche den Datensatz for i, record in enumerate(data): if "id" in record and record["id"] == record_id: # Prüfe, ob der Datensatz zum aktuellen Mandanten gehört if "mandate_id" in record and record["mandate_id"] != self.mandate_id: raise ValueError("Not your mandate") # Verhindere Änderung der ID bei initialem Datensatz initial_id = self.get_initial_id(table) if initial_id is not None and initial_id == record_id and "id" in record_data and record_data["id"] != record_id: raise ValueError(f"Die ID des initialen Datensatzes in Tabelle {table} kann nicht geändert werden") # Aktualisiere den Datensatz for key, value in record_data.items(): data[i][key] = value # Speichere die aktualisierte Tabelle if self._save_table(table, data): return data[i] else: raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}") # Datensatz nicht gefunden raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}") # System-Tabellen-Funktionen def register_initial_id(self, table: str, initial_id: int) -> bool: """ Registriert die initiale ID für eine Tabelle. Args: table: Name der Tabelle initial_id: Die initiale ID Returns: True bei Erfolg, False bei Fehler """ try: # Lade die aktuelle System-Tabelle system_data = self._load_system_table() # Nur registrieren, wenn noch nicht vorhanden if table not in system_data: system_data[table] = initial_id success = self._save_system_table(system_data) if success: logger.info(f"Initiale ID {initial_id} für Tabelle {table} registriert") return success return True # Wenn bereits vorhanden, ist das kein Fehler except Exception as e: logger.error(f"Fehler beim Registrieren der initialen ID für Tabelle {table}: {e}") return False def get_initial_id(self, table: str) -> Optional[int]: """ Gibt die initiale ID für eine Tabelle zurück. Args: table: Name der Tabelle Returns: Die initiale ID oder None, wenn nicht vorhanden """ system_data = self._load_system_table() initial_id = system_data.get(table) if initial_id is None: logger.debug(f"Keine initiale ID für Tabelle {table} gefunden") return initial_id def has_initial_id(self, table: str) -> bool: """ Prüft, ob eine initiale ID für eine Tabelle registriert ist. Args: table: Name der Tabelle Returns: True, wenn eine initiale ID registriert ist, sonst False """ system_data = self._load_system_table() return table in system_data def get_all_initial_ids(self) -> Dict[str, int]: """ Gibt alle registrierten initialen IDs zurück. Returns: Dictionary mit Tabellennamen als Schlüssel und initialen IDs als Werte """ system_data = self._load_system_table() return system_data.copy() # Kopie zurückgeben, um das Original zu schützen