gateway/gwserver/connector_db_json.py
2025-03-18 23:25:42 +01:00

368 lines
No EOL
14 KiB
Python

import json
import os
from typing import List, Dict, Any, Optional, Union
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class JSONDatabaseConnector:
"""
Ein Konnektor für JSON-basierte Datenspeicherung.
Stellt generische Datenbankoperationen bereit.
"""
def __init__(self, db_folder: str, db_user: str = None, db_apikey: str = None, mandate_id: int = None, user_id: int = None):
"""
Initialisiert den JSON-Datenbankkonnektor.
Args:
db_folder: Verzeichnis für die JSON-Dateien
db_user: Benutzername für die Authentifizierung (optional)
db_apikey: API-Schlüssel für die Authentifizierung (optional)
mandate_id: Kontext-Parameter für den Mandanten
user_id: Kontext-Parameter für den Benutzer
"""
# Speichere die Eingabeparameter
self.db_folder = db_folder
self.db_user = db_user
self.db_apikey = db_apikey
# Prüfe, ob Kontext-Parameter gesetzt sind
if mandate_id is None or user_id is None:
raise ValueError("mandate_id und user_id müssen gesetzt sein")
self.mandate_id = mandate_id
self.user_id = user_id
# Stelle sicher, dass das Datenbankverzeichnis existiert
os.makedirs(db_folder, exist_ok=True)
# Cache für geladene Daten
self._tables_cache = {}
logger.info(f"JSONDatabaseConnector initialisiert für Verzeichnis: {db_folder}")
logger.info(f"Kontext: mandate_id={mandate_id}, user_id={user_id}")
def _get_table_path(self, table: str) -> str:
"""Gibt den vollständigen Pfad zu einer Tabellendatei zurück"""
return os.path.join(self.db_folder, f"{table}.json")
def _load_table(self, table: str) -> List[Dict[str, Any]]:
"""Lädt eine Tabelle aus der entsprechenden JSON-Datei"""
path = self._get_table_path(table)
# Wenn die Tabelle bereits im Cache ist, verwende den Cache
if table in self._tables_cache:
logger.info(f"Lade Tabelle {table} aus Cache")
return self._tables_cache[table]
# Ansonsten lade die Datei
try:
if os.path.exists(path):
logger.info(f"Lade Tabelle {table} aus JSON {path}")
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._tables_cache[table] = data
return data
else:
# Wenn die Datei nicht existiert, erstelle eine leere Tabelle
logger.info(f"Neue Tabelle {table}")
self._tables_cache[table] = []
self._save_table(table, [])
return []
except Exception as e:
logger.error(f"Fehler beim Laden der Tabelle {table}: {e}")
return []
def _save_table(self, table: str, data: List[Dict[str, Any]]) -> bool:
"""Speichert eine Tabelle in der entsprechenden JSON-Datei"""
path = self._get_table_path(table)
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Aktualisiere den Cache
self._tables_cache[table] = data
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der Tabelle {table}: {e}")
return False
def _filter_by_context(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filtert Datensätze nach dem Mandanten- und Benutzerkontext,
sofern diese Felder im Datensatz existieren.
"""
filtered_records = []
for record in records:
# Prüfe, ob mandate_id im Datensatz existiert und nicht null ist
has_mandate = "mandate_id" in record and record["mandate_id"] is not None and record["mandate_id"] != ""
# Prüfe, ob user_id im Datensatz existiert und nicht null ist
has_user = "user_id" in record and record["user_id"] is not None and record["user_id"] != ""
# Wenn beides existiert, filtere entsprechend
if has_mandate and has_user:
if record["mandate_id"] == self.mandate_id:
filtered_records.append(record)
# Wenn nur mandate_id existiert
elif has_mandate and not has_user:
if record["mandate_id"] == self.mandate_id:
filtered_records.append(record)
# Wenn weder mandate_id noch user_id existieren, füge den Datensatz hinzu
elif not has_mandate and not has_user:
filtered_records.append(record)
return filtered_records
def _apply_record_filter(self, records: List[Dict[str, Any]], record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Wendet einen Datensatzfilter auf die Datensätze an"""
if not record_filter:
return records
filtered_records = []
for record in records:
match = True
for field, value in record_filter.items():
# Prüfen, ob das Feld existiert
if field not in record:
match = False
break
# Wenn der Filterwert ein Integer-String ist und das Datensatzfeld ein Integer
if isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
if record[field] != int(value):
match = False
break
# Sonst direkter Vergleich
elif record[field] != value:
match = False
break
if match:
filtered_records.append(record)
return filtered_records
# Public API
def get_tables(self, filter_criteria: Dict[str, Any] = None) -> List[str]:
"""
Gibt eine Liste aller verfügbaren Tabellen zurück.
Args:
filter_criteria: Optionale Filterkriterien (nicht implementiert)
Returns:
Liste der Tabellennamen
"""
tables = []
try:
for filename in os.listdir(self.db_folder):
if filename.endswith('.json'):
table_name = filename[:-5] # Entferne die .json-Endung
tables.append(table_name)
except Exception as e:
logger.error(f"Fehler beim Lesen des Datenbankverzeichnisses: {e}")
return tables
def get_fields(self, table: str, filter_criteria: Dict[str, Any] = None) -> List[str]:
"""
Gibt eine Liste aller Felder einer Tabelle zurück.
Args:
table: Name der Tabelle
filter_criteria: Optionale Filterkriterien (nicht implementiert)
Returns:
Liste der Feldnamen
"""
# Lade die Tabellendaten
data = self._load_table(table)
if not data:
return []
# Nehme den ersten Datensatz als Referenz für die Felder
fields = list(data[0].keys()) if data else []
return fields
def get_schema(self, table: str, language: str = None, filter_criteria: Dict[str, Any] = None) -> Dict[str, Dict[str, Any]]:
"""
Gibt ein Schema-Objekt für eine Tabelle zurück mit Datentypen und Labels.
Args:
table: Name der Tabelle
language: Sprache für die Labels (optional)
filter_criteria: Optionale Filterkriterien (nicht implementiert)
Returns:
Schema-Objekt mit Feldern, Datentypen und Labels
"""
# Lade die Tabellendaten
data = self._load_table(table)
schema = {}
if not data:
return schema
# Nehme den ersten Datensatz als Referenz für die Felder und Datentypen
first_record = data[0]
for field, value in first_record.items():
# Bestimme den Datentyp
data_type = type(value).__name__
# Label erstellen (Standardwert ist der Feldname)
label = field
# Wenn model_info verfügbar ist, versuche das Label aus dem Modell zu holen
# Implementierung hängt vom tatsächlichen Modell ab
schema[field] = {
"type": data_type,
"label": label
}
return schema
def get_recordset(self, table: str, field_filter: Dict[str, Any] = None, record_filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Gibt eine Liste von Datensätzen aus einer Tabelle zurück, gefiltert nach Kriterien.
Args:
table: Name der Tabelle
field_filter: Filter für Felder (welche Felder zurückgegeben werden sollen)
record_filter: Filter für Datensätze (welche Datensätze zurückgegeben werden sollen)
Returns:
Liste der gefilterten Datensätze
"""
# Lade die Tabellendaten
data = self._load_table(table)
# Filtere nach Mandanten- und Benutzerkontext
filtered_data = self._filter_by_context(data)
# Wende record_filter an, wenn vorhanden
if record_filter:
filtered_data = self._apply_record_filter(filtered_data, record_filter)
# Wenn field_filter vorhanden ist, reduziere die Felder
if field_filter and isinstance(field_filter, list):
result = []
for record in filtered_data:
filtered_record = {}
for field in field_filter:
if field in record:
filtered_record[field] = record[field]
result.append(filtered_record)
return result
return filtered_data
def record_create(self, table: str, record_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Erstellt einen neuen Datensatz in der Tabelle.
Args:
table: Name der Tabelle
record_data: Daten für den neuen Datensatz
Returns:
Der erstellte Datensatz
"""
# Lade die Tabellendaten
data = self._load_table(table)
# Füge mandate_id und user_id hinzu, falls nicht vorhanden
if "mandate_id" not in record_data:
record_data["mandate_id"] = self.mandate_id
if "user_id" not in record_data:
record_data["user_id"] = self.user_id
# Füge den neuen Datensatz hinzu
data.append(record_data)
# Speichere die aktualisierte Tabelle
if self._save_table(table, data):
return record_data
else:
raise ValueError(f"Fehler beim Erstellen des Datensatzes in Tabelle {table}")
def record_delete(self, table: str, record_id: Union[str, int]) -> bool:
"""
Löscht einen Datensatz aus der Tabelle.
Args:
table: Name der Tabelle
record_id: ID des zu löschenden Datensatzes
Returns:
True bei Erfolg, False bei Fehler
"""
# Lade die Tabellendaten
data = self._load_table(table)
# Suche den Datensatz
for i, record in enumerate(data):
if "id" in record and record["id"] == record_id:
# Prüfe, ob der Datensatz zum aktuellen Mandanten gehört
if "mandate_id" in record and record["mandate_id"] != self.mandate_id:
raise ValueError("Not your mandate")
# Lösche den Datensatz
del data[i]
# Speichere die aktualisierte Tabelle
return self._save_table(table, data)
# Datensatz nicht gefunden
return False
def record_modify(self, table: str, record_id: Union[str, int], record_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Ändert einen Datensatz in der Tabelle.
Args:
table: Name der Tabelle
record_id: ID des zu ändernden Datensatzes
record_data: Neue Daten für den Datensatz
Returns:
Der aktualisierte Datensatz
"""
# Lade die Tabellendaten
data = self._load_table(table)
# Suche den Datensatz
for i, record in enumerate(data):
if "id" in record and record["id"] == record_id:
# Prüfe, ob der Datensatz zum aktuellen Mandanten gehört
if "mandate_id" in record and record["mandate_id"] != self.mandate_id:
raise ValueError("Not your mandate")
# Aktualisiere den Datensatz
for key, value in record_data.items():
data[i][key] = value
# Speichere die aktualisierte Tabelle
if self._save_table(table, data):
return data[i]
else:
raise ValueError(f"Fehler beim Aktualisieren des Datensatzes in Tabelle {table}")
# Datensatz nicht gefunden
raise ValueError(f"Datensatz mit ID {record_id} nicht gefunden in Tabelle {table}")