529 lines
No EOL
20 KiB
Python
529 lines
No EOL
20 KiB
Python
import json
|
|
import os
|
|
from typing import List, Dict, Any, Optional, Union
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DatabaseConnector:
|
|
"""
|
|
Ein Konnektor für JSON-basierte Datenspeicherung.
|
|
Stellt generische Datenbankoperationen bereit.
|
|
"""
|
|
|
|
def __init__(self, db_folder: str, db_user: str = None, db_apikey: str = None, mandate_id: int = None, user_id: int = None):
|
|
"""
|
|
Initialisiert den JSON-Datenbankkonnektor.
|
|
|
|
Args:
|
|
db_folder: Verzeichnis für die JSON-Dateien
|
|
db_user: Benutzername für die Authentifizierung (optional)
|
|
db_apikey: API-Schlüssel für die Authentifizierung (optional)
|
|
mandate_id: Kontext-Parameter für den Mandanten
|
|
user_id: Kontext-Parameter für den Benutzer
|
|
"""
|
|
# Speichere die Eingabeparameter
|
|
self.db_folder = db_folder
|
|
self.db_user = db_user
|
|
self.db_apikey = db_apikey
|
|
|
|
# Prüfe, ob Kontext-Parameter gesetzt sind
|
|
if mandate_id is None or user_id is None:
|
|
raise ValueError("mandate_id und user_id müssen gesetzt sein")
|
|
|
|
# Stelle sicher, dass das Datenbankverzeichnis existiert
|
|
os.makedirs(db_folder, exist_ok=True)
|
|
|
|
# Cache für geladene Daten
|
|
self._tables_cache = {}
|
|
|
|
# System-Tabelle initialisieren
|
|
self._system_table_name = "_system"
|
|
self._initialize_system_table()
|
|
|
|
# Temporär mandate_id und user_id speichern
|
|
self._mandate_id = mandate_id
|
|
self._user_id = user_id
|
|
|
|
# Wenn mandate_id oder user_id 0 sind, versuche die initialen IDs zu verwenden
|
|
if mandate_id == 0:
|
|
initial_mandate_id = self.get_initial_id("mandates")
|
|
if initial_mandate_id is not None:
|
|
self._mandate_id = initial_mandate_id
|
|
logger.info(f"Verwende initiale mandate_id: {initial_mandate_id} statt 0")
|
|
|
|
if user_id == 0:
|
|
initial_user_id = self.get_initial_id("users")
|
|
if initial_user_id is not None:
|
|
self._user_id = initial_user_id
|
|
logger.info(f"Verwende initiale user_id: {initial_user_id} statt 0")
|
|
|
|
# Setze die effektiven IDs als Eigenschaften
|
|
self.mandate_id = self._mandate_id
|
|
self.user_id = self._user_id
|
|
|
|
logger.info(f"DatabaseConnector initialisiert für Verzeichnis: {db_folder}")
|
|
logger.info(f"Kontext: mandate_id={self.mandate_id}, user_id={self.user_id}")
|
|
|
|
def _initialize_system_table(self):
|
|
"""Initialisiert die System-Tabelle, falls sie noch nicht existiert."""
|
|
system_table_path = self._get_table_path(self._system_table_name)
|
|
if not os.path.exists(system_table_path):
|
|
empty_system_table = {}
|
|
self._save_system_table(empty_system_table)
|
|
logger.info(f"System-Tabelle initialisiert in {system_table_path}")
|
|
|
|
def _load_system_table(self) -> Dict[str, int]:
|
|
"""Lädt die System-Tabelle mit den initialen IDs."""
|
|
system_table_path = self._get_table_path(self._system_table_name)
|
|
try:
|
|
if os.path.exists(system_table_path):
|
|
with open(system_table_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
else:
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden der System-Tabelle: {e}")
|
|
return {}
|
|
|
|
def _save_system_table(self, data: Dict[str, int]) -> bool:
|
|
"""Speichert die System-Tabelle mit den initialen IDs."""
|
|
system_table_path = self._get_table_path(self._system_table_name)
|
|
try:
|
|
with open(system_table_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Speichern der System-Tabelle: {e}")
|
|
return False
|
|
|
|
def _get_table_path(self, table: str) -> str:
|
|
"""Gibt den vollständigen Pfad zu einer Tabellendatei zurück"""
|
|
return os.path.join(self.db_folder, f"{table}.json")
|
|
|
|
def _load_table(self, table: str) -> List[Dict[str, Any]]:
|
|
"""Lädt eine Tabelle aus der entsprechenden JSON-Datei"""
|
|
path = self._get_table_path(table)
|
|
|
|
# Wenn die Tabelle die System-Tabelle ist, lade sie direkt
|
|
if table == self._system_table_name:
|
|
return [] # Die System-Tabelle wird nicht wie normale Tabellen behandelt
|
|
|
|
# Wenn die Tabelle bereits im Cache ist, verwende den Cache
|
|
if table in self._tables_cache:
|
|
logger.info(f"Lade Tabelle {table} aus Cache")
|
|
return self._tables_cache[table]
|
|
|
|
# Ansonsten lade die Datei
|
|
try:
|
|
if os.path.exists(path):
|
|
logger.info(f"Lade Tabelle {table} aus JSON {path}")
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
self._tables_cache[table] = data
|
|
|
|
# Wenn Daten geladen wurden und noch keine initiale ID registriert ist,
|
|
# registriere die ID des ersten Datensatzes (falls vorhanden)
|
|
if data and not self.has_initial_id(table):
|
|
if "id" in data[0]:
|
|
self.register_initial_id(table, data[0]["id"])
|
|
logger.info(f"Initiale ID {data[0]['id']} für Tabelle {table} nachträglich registriert")
|
|
|
|
return data
|
|
else:
|
|
# Wenn die Datei nicht existiert, erstelle eine leere Tabelle
|
|
logger.info(f"Neue Tabelle {table}")
|
|
self._tables_cache[table] = []
|
|
self._save_table(table, [])
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden der Tabelle {table}: {e}")
|
|
return []
|
|
|
|
def _save_table(self, table: str, data: List[Dict[str, Any]]) -> bool:
|
|
"""Speichert eine Tabelle in der entsprechenden JSON-Datei"""
|
|
# Die System-Tabelle wird speziell behandelt
|
|
if table == self._system_table_name:
|
|
return False
|
|
|
|
path = self._get_table_path(table)
|
|
try:
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Aktualisiere den Cache
|
|
self._tables_cache[table] = data
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Speichern der Tabelle {table}: {e}")
|
|
return False
|
|
|
|
def _filter_by_context(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filtert Datensätze nach dem Mandanten- und Benutzerkontext,
|
|
sofern diese Felder im Datensatz existieren.
|
|
"""
|
|
filtered_records = []
|
|
|
|
for record in records:
|
|
# Prüfe, ob mandate_id im Datensatz existiert und nicht null ist
|
|
has_mandate = "mandate_id" in record and record["mandate_id"] is not None and record["mandate_id"] != ""
|
|
|
|
# Prüfe, ob user_id im Datensatz existiert und nicht null ist
|
|
has_user = "user_id" in record and record["user_id"] is not None and record["user_id"] != ""
|
|
|
|
# Wenn beides existiert, filtere entsprechend
|
|
if has_mandate and has_user:
|
|
if record["mandate_id"] == self.mandate_id:
|
|
filtered_records.append(record)
|
|
# Wenn nur mandate_id existiert
|
|
elif has_mandate and not has_user:
|
|
if record["mandate_id"] == self.mandate_id:
|
|
filtered_records.append(record)
|
|
# Wenn weder mandate_id noch user_id existieren, füge den Datensatz hinzu
|
|
elif not has_mandate and not has_user:
|
|
filtered_records.append(record)
|
|
|
|
return filtered_records
|
|
|
|
def _apply_record_filter(self, records: List[Dict[str, Any]], record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
"""Wendet einen Datensatzfilter auf die Datensätze an"""
|
|
if not record_filter:
|
|
return records
|
|
|
|
filtered_records = []
|
|
|
|
for record in records:
|
|
match = True
|
|
|
|
for field, value in record_filter.items():
|
|
# Prüfen, ob das Feld existiert
|
|
if field not in record:
|
|
match = False
|
|
break
|
|
|
|
# Wenn der Filterwert ein Integer-String ist und das Datensatzfeld ein Integer
|
|
if isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
|
|
if record[field] != int(value):
|
|
match = False
|
|
break
|
|
# Sonst direkter Vergleich
|
|
elif record[field] != value:
|
|
match = False
|
|
break
|
|
|
|
if match:
|
|
filtered_records.append(record)
|
|
|
|
return filtered_records
|
|
|
|
|
|
# Public API
|
|
|
|
def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]:
|
|
"""
|
|
Gibt eine Liste aller verfügbaren Tabellen zurück.
|
|
|
|
Args:
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Liste der Tabellennamen
|
|
"""
|
|
tables = []
|
|
|
|
try:
|
|
for filename in os.listdir(self.db_folder):
|
|
if filename.endswith('.json') and not filename.startswith('_'):
|
|
table_name = filename[:-5] # Entferne die .json-Endung
|
|
tables.append(table_name)
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen des Datenbankverzeichnisses: {e}")
|
|
|
|
return tables
|
|
|
|
def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]:
|
|
"""
|
|
Gibt eine Liste aller Felder einer Tabelle zurück.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Liste der Feldnamen
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
if not data:
|
|
return []
|
|
|
|
# Nehme den ersten Datensatz als Referenz für die Felder
|
|
fields = list(data[0].keys()) if data else []
|
|
|
|
return fields
|
|
|
|
def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
language: Sprache für die Labels (optional)
|
|
filter_criteria: Optionale Filterkriterien (nicht implementiert)
|
|
|
|
Returns:
|
|
Schema-Objekt mit Feldern, Datentypen und Labels
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
schema = {}
|
|
|
|
if not data:
|
|
return schema
|
|
|
|
# Nehme den ersten Datensatz als Referenz für die Felder und Datentypen
|
|
first_record = data[0]
|
|
|
|
for field, value in first_record.items():
|
|
# Bestimme den Datentyp
|
|
data_type = type(value).__name__
|
|
|
|
# Label erstellen (Standardwert ist der Feldname)
|
|
label = field
|
|
|
|
# Wenn model_info verfügbar ist, versuche das Label aus dem Modell zu holen
|
|
# Implementierung hängt vom tatsächlichen Modell ab
|
|
|
|
schema[field] = {
|
|
"type": data_type,
|
|
"label": label
|
|
}
|
|
|
|
return schema
|
|
|
|
def get_recordset(self, table: str, field_filter: Dict[str, Any] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen)
|
|
record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen)
|
|
|
|
Returns:
|
|
Liste der gefilterten Datensätze
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
# Filtere nach Mandanten- und Benutzerkontext
|
|
filtered_data = self._filter_by_context(data)
|
|
|
|
# Wende record_filter an, wenn vorhanden
|
|
if record_filter:
|
|
filtered_data = self._apply_record_filter(filtered_data, record_filter)
|
|
|
|
# Wenn field_filter vorhanden ist, reduziere die Felder
|
|
if field_filter and isinstance(field_filter, list):
|
|
result = []
|
|
for record in filtered_data:
|
|
filtered_record = {}
|
|
for field in field_filter:
|
|
if field in record:
|
|
filtered_record[field] = record[field]
|
|
result.append(filtered_record)
|
|
return result
|
|
|
|
return filtered_data
|
|
|
|
def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Erstellt einen neuen Datensatz in der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_data: Daten für den neuen Datensatz
|
|
|
|
Returns:
|
|
Der erstellte Datensatz
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
# Füge mandate_id und user_id hinzu, falls nicht vorhanden oder 0
|
|
if "mandate_id" not in record_data or record_data["mandate_id"] == 0:
|
|
record_data["mandate_id"] = self.mandate_id
|
|
|
|
if "user_id" not in record_data or record_data["user_id"] == 0:
|
|
record_data["user_id"] = self.user_id
|
|
|
|
# Bestimme die nächste ID, falls nicht vorhanden
|
|
if "id" not in record_data:
|
|
next_id = 1
|
|
if data:
|
|
next_id = max(record["id"] for record in data if "id" in record) + 1
|
|
record_data["id"] = next_id
|
|
|
|
# Wenn die Tabelle leer ist und eine System-ID registriert werden soll
|
|
if not data:
|
|
self.register_initial_id(table, record_data["id"])
|
|
logger.info(f"Initiale ID {record_data['id']} für Tabelle {table} registriert")
|
|
|
|
# Füge den neuen Datensatz hinzu
|
|
data.append(record_data)
|
|
|
|
# Speichere die aktualisierte Tabelle
|
|
if self._save_table(table, data):
|
|
return record_data
|
|
else:
|
|
raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}")
|
|
|
|
def record_delete(self, table: str, record_id: Union[str, int]) -> bool:
|
|
"""
|
|
Löscht einen Datensatz aus der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_id: ID des zu löschenden Datensatzes
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
# Prüfe, ob es sich um die initiale ID handelt
|
|
initial_id = self.get_initial_id(table)
|
|
if initial_id is not None and initial_id == record_id:
|
|
logger.warning(f"Versuch, den initialen Datensatz mit ID {record_id} aus Tabelle {table} zu löschen, wurde verhindert")
|
|
return False
|
|
|
|
# Suche den Datensatz
|
|
for i, record in enumerate(data):
|
|
if "id" in record and record["id"] == record_id:
|
|
# Prüfe, ob der Datensatz zum aktuellen Mandanten gehört
|
|
if "mandate_id" in record and record["mandate_id"] != self.mandate_id:
|
|
raise ValueError("Not your mandate")
|
|
|
|
# Lösche den Datensatz
|
|
del data[i]
|
|
|
|
# Speichere die aktualisierte Tabelle
|
|
return self._save_table(table, data)
|
|
|
|
# Datensatz nicht gefunden
|
|
return False
|
|
|
|
def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Ändert einen Datensatz in der Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
record_id: ID des zu ändernden Datensatzes
|
|
record_data: Neue Daten für den Datensatz
|
|
|
|
Returns:
|
|
Der aktualisierte Datensatz
|
|
"""
|
|
# Lade die Tabellendaten
|
|
data = self._load_table(table)
|
|
|
|
# Suche den Datensatz
|
|
for i, record in enumerate(data):
|
|
if "id" in record and record["id"] == record_id:
|
|
# Prüfe, ob der Datensatz zum aktuellen Mandanten gehört
|
|
if "mandate_id" in record and record["mandate_id"] != self.mandate_id:
|
|
raise ValueError("Not your mandate")
|
|
|
|
# Verhindere Änderung der ID bei initialem Datensatz
|
|
initial_id = self.get_initial_id(table)
|
|
if initial_id is not None and initial_id == record_id and "id" in record_data and record_data["id"] != record_id:
|
|
raise ValueError(f"Die ID des initialen Datensatzes in Tabelle {table} kann nicht geändert werden")
|
|
|
|
# Aktualisiere den Datensatz
|
|
for key, value in record_data.items():
|
|
data[i][key] = value
|
|
|
|
# Speichere die aktualisierte Tabelle
|
|
if self._save_table(table, data):
|
|
return data[i]
|
|
else:
|
|
raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}")
|
|
|
|
# Datensatz nicht gefunden
|
|
raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}")
|
|
|
|
# System-Tabellen-Funktionen
|
|
|
|
def register_initial_id(self, table: str, initial_id: int) -> bool:
|
|
"""
|
|
Registriert die initiale ID für eine Tabelle.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
initial_id: Die initiale ID
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
# Lade die aktuelle System-Tabelle
|
|
system_data = self._load_system_table()
|
|
|
|
# Nur registrieren, wenn noch nicht vorhanden
|
|
if table not in system_data:
|
|
system_data[table] = initial_id
|
|
success = self._save_system_table(system_data)
|
|
if success:
|
|
logger.info(f"Initiale ID {initial_id} für Tabelle {table} registriert")
|
|
return success
|
|
return True # Wenn bereits vorhanden, ist das kein Fehler
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Registrieren der initialen ID für Tabelle {table}: {e}")
|
|
return False
|
|
|
|
def get_initial_id(self, table: str) -> Optional[int]:
|
|
"""
|
|
Gibt die initiale ID für eine Tabelle zurück.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
|
|
Returns:
|
|
Die initiale ID oder None, wenn nicht vorhanden
|
|
"""
|
|
system_data = self._load_system_table()
|
|
initial_id = system_data.get(table)
|
|
if initial_id is None:
|
|
logger.debug(f"Keine initiale ID für Tabelle {table} gefunden")
|
|
return initial_id
|
|
|
|
def has_initial_id(self, table: str) -> bool:
|
|
"""
|
|
Prüft, ob eine initiale ID für eine Tabelle registriert ist.
|
|
|
|
Args:
|
|
table: Name der Tabelle
|
|
|
|
Returns:
|
|
True, wenn eine initiale ID registriert ist, sonst False
|
|
"""
|
|
system_data = self._load_system_table()
|
|
return table in system_data
|
|
|
|
def get_all_initial_ids(self) -> Dict[str, int]:
|
|
"""
|
|
Gibt alle registrierten initialen IDs zurück.
|
|
|
|
Returns:
|
|
Dictionary mit Tabellennamen als Schlüssel und initialen IDs als Werte
|
|
"""
|
|
system_data = self._load_system_table()
|
|
return system_data.copy() # Kopie zurückgeben, um das Original zu schützen |