import json import os from typing import List, Dict, Any, Optional, Union import logging logger = logging.getLogger(__name__) class DatabaseConnector: """ A connector for JSON-based data storage. Provides generic database operations with tenant and user context support. """ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, mandateId: int = None, userId: int = None, skipInitialIdLookup: bool = False): """ Initializes the JSON database connector. Args: dbHost: Directory for the JSON files dbDatabase: Database name dbUser: Username for authentication (optional) dbPassword: API key for authentication (optional) mandateId: Context parameter for the tenant userId: Context parameter for the user skipInitialIdLookup: When True, skips looking up initial IDs for mandateId and userId """ # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword self.skipInitialIdLookup = skipInitialIdLookup # Check if context parameters are set if mandateId is None or userId is None: raise ValueError("mandateId and userId must be set") # Ensure the database directory exists self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) os.makedirs(self.dbFolder, exist_ok=True) # Cache for loaded data self._tablesCache = {} # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() # Temporarily store mandateId and userId self._mandateId = mandateId self._userId = userId # If mandateId or userId are 0 and we're not skipping ID lookup, try to use the initial IDs if not skipInitialIdLookup: if mandateId == 0: initialMandateId = self.getInitialId("mandates") if initialMandateId is not None: self._mandateId = initialMandateId logger.info(f"Using initial mandateId: {initialMandateId} instead of 0") if userId == 0: initialUserId = self.getInitialId("users") if initialUserId is not None: self._userId = initialUserId logger.info(f"Using initial userId: {initialUserId} instead of 0") # Set the effective IDs as properties self.mandateId = self._mandateId self.userId = self._userId logger.info(f"DatabaseConnector initialized for directory: {self.dbFolder}") logger.debug(f"Context: mandateId={self.mandateId}, userId={self.userId}") def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" systemTablePath = self._getTablePath(self._systemTableName) if not os.path.exists(systemTablePath): emptySystemTable = {} self._saveSystemTable(emptySystemTable) logger.info(f"System table initialized in {systemTablePath}") else: # Load existing system table to ensure it's available self._loadSystemTable() logger.debug(f"Existing system table loaded from {systemTablePath}") def _loadSystemTable(self) -> Dict[str, int]: """Loads the system table with the initial IDs.""" # Check if system table is in cache if f"_{self._systemTableName}" in self._tablesCache: return self._tablesCache[f"_{self._systemTableName}"] systemTablePath = self._getTablePath(self._systemTableName) try: if os.path.exists(systemTablePath): with open(systemTablePath, 'r', encoding='utf-8') as f: data = json.load(f) # Store in cache with special prefix to avoid collision with regular tables self._tablesCache[f"_{self._systemTableName}"] = data return data else: self._tablesCache[f"_{self._systemTableName}"] = {} return {} except Exception as e: logger.error(f"Error loading the system table: {e}") self._tablesCache[f"_{self._systemTableName}"] = {} return {} def _saveSystemTable(self, data: Dict[str, int]) -> bool: """Saves the system table with the initial IDs.""" systemTablePath = self._getTablePath(self._systemTableName) try: with open(systemTablePath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update cache self._tablesCache[f"_{self._systemTableName}"] = data return True except Exception as e: logger.error(f"Error saving the system table: {e}") return False def _getTablePath(self, table: str) -> str: """Returns the full path to a table file""" return os.path.join(self.dbFolder, f"{table}.json") def _loadTable(self, table: str) -> List[Dict[str, Any]]: """Loads a table from the corresponding JSON file""" path = self._getTablePath(table) # If the table is the system table, load it directly if table == self._systemTableName: return [] # The system table is not treated like normal tables # If the table is already in the cache, use the cache if table in self._tablesCache: return self._tablesCache[table] # Otherwise load the file try: if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: data = json.load(f) self._tablesCache[table] = data # If data was loaded and no initial ID is registered yet, # register the ID of the first record (if available) if data and not self.hasInitialId(table): if "id" in data[0]: self._registerInitialId(table, data[0]["id"]) logger.info(f"Initial ID {data[0]['id']} for table {table} retroactively registered") return data else: # If the file doesn't exist, create an empty table logger.info(f"New table {table}") self._tablesCache[table] = [] self._saveTable(table, []) return [] except Exception as e: logger.error(f"Error loading table {table}: {e}") return [] def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool: """Saves a table to the corresponding JSON file""" # The system table is handled specially if table == self._systemTableName: return False path = self._getTablePath(table) try: with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update the cache self._tablesCache[table] = data return True except Exception as e: logger.error(f"Error saving table {table}: {e}") return False def _filterByContext(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Filters records by tenant and user context, if these fields exist in the record. """ filteredRecords = [] for record in records: # Check if mandateId exists in the record and is not null hasMandate = "mandateId" in record and record["mandateId"] is not None and record["mandateId"] != "" # Check if userId exists in the record and is not null hasUser = "userId" in record and record["userId"] is not None and record["userId"] != "" # If both exist, filter accordingly if hasMandate and hasUser: if record["mandateId"] == self.mandateId: filteredRecords.append(record) # If only mandateId exists elif hasMandate and not hasUser: if record["mandateId"] == self.mandateId: filteredRecords.append(record) # If neither mandateId nor userId exist, add the record elif not hasMandate and not hasUser: filteredRecords.append(record) return filteredRecords def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Applies a record filter to the records""" if not recordFilter: return records filteredRecords = [] for record in records: match = True for field, value in recordFilter.items(): # Check if the field exists if field not in record: match = False break # Handle type conversion for integer comparisons both ways if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit(): # Filter value is int, record value is string if value != int(record[field]): match = False break elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int): # Filter value is string, record value is int if record[field] != int(value): match = False break # Otherwise direct comparison elif record[field] != value: match = False break if match: filteredRecords.append(record) return filteredRecords def _registerInitialId(self, table: str, initialId: int) -> bool: """ Registers the initial ID for a table. Args: table: Name of the table initialId: The initial ID Returns: True on success, False on error """ try: # Load the current system table systemData = self._loadSystemTable() # Only register if not already present if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success return True # If already present, this is not an error except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """ Removes the initial ID for a table from the system table. Args: table: Name of the table Returns: True on success, False on error """ try: # Load the current system table systemData = self._loadSystemTable() # Remove the entry if it exists if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID for table {table} removed from system table") return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False # Public API def getTables(self) -> List[str]: """ Returns a list of all available tables. Returns: List of table names """ tables = [] try: for filename in os.listdir(self.dbFolder): if filename.endswith('.json') and not filename.startswith('_'): tableName = filename[:-5] # Remove the .json extension tables.append(tableName) except Exception as e: logger.error(f"Error reading the database directory: {e}") return tables def getFields(self, table: str) -> List[str]: """ Returns a list of all fields in a table. Args: table: Name of the table Returns: List of field names """ # Load the table data data = self._loadTable(table) if not data: return [] # Take the first record as a reference for the fields fields = list(data[0].keys()) if data else [] return fields def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]: """ Returns a schema object for a table with data types and labels. Args: table: Name of the table language: Language for the labels (optional) Returns: Schema object with fields, data types and labels """ # Load the table data data = self._loadTable(table) schema = {} if not data: return schema # Take the first record as a reference for the fields and data types firstRecord = data[0] for field, value in firstRecord.items(): # Determine the data type dataType = type(value).__name__ # Create label (default is the field name) label = field schema[field] = { "type": dataType, "label": label } return schema def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """ Returns a list of records from a table, filtered by criteria. Args: table: Name of the table fieldFilter: Filter for fields (which fields should be returned) recordFilter: Filter for records (which records should be returned) Returns: List of filtered records """ # Load the table data data = self._loadTable(table) logger.debug(f"getRecordset: data volume of {len(data)} bytes") # Filter by tenant and user context filteredData = self._filterByContext(data) # Apply recordFilter if available if recordFilter: filteredData = self._applyRecordFilter(filteredData, recordFilter) # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in filteredData: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return filteredData def recordCreate(self, table: str, recordData: Dict[str, Any]) -> Dict[str, Any]: """ Creates a new record in the table. Args: table: Name of the table recordData: Data for the new record Returns: The created record """ # Load the table data data = self._loadTable(table) # Add mandateId and userId if not present or 0 if "mandateId" not in recordData or recordData["mandateId"] == 0: recordData["mandateId"] = self.mandateId if "userId" not in recordData or recordData["userId"] == 0: recordData["userId"] = self.userId # Determine the next ID if not present if "id" not in recordData: nextId = 1 if data: nextId = max(record["id"] for record in data if "id" in record) + 1 recordData["id"] = nextId # If the table is empty and a system ID should be registered if not data: self._registerInitialId(table, recordData["id"]) logger.info(f"Initial ID {recordData['id']} for table {table} has been registered") # Add the new record data.append(recordData) # Save the updated table if self._saveTable(table, data): return recordData else: raise ValueError(f"Error creating the record in table {table}") def recordDelete(self, table: str, recordId: Union[str, int]) -> bool: """ Deletes a record from the table. Args: table: Name of the table recordId: ID of the record to delete Returns: True on success, False on error """ # Load table data data = self._loadTable(table) # Search for the record for i, record in enumerate(data): if "id" in record and record["id"] == recordId: # Check if the record belongs to the current mandate if "mandateId" in record and record["mandateId"] != self.mandateId: raise ValueError("Not your mandate") # Check if it's an initial record initialId = self.getInitialId(table) if initialId is not None and initialId == recordId: # Remove this entry from the system table self._removeInitialId(table) logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record del data[i] # Save the updated table return self._saveTable(table, data) # Record not found return False def recordModify(self, table: str, recordId: Union[str, int], recordData: Dict[str, Any]) -> Dict[str, Any]: """ Modifies a record in the table. Args: table: Name of the table recordId: ID of the record to modify recordData: New data for the record Returns: The updated record """ # Load table data data = self._loadTable(table) # Search for the record for i, record in enumerate(data): if "id" in record and record["id"] == recordId: # Check if the record belongs to the current mandate if "mandateId" in record and record["mandateId"] != self.mandateId: raise ValueError("Not your mandate") # Prevent changing the ID if "id" in recordData and recordData["id"] != recordId: raise ValueError(f"The ID of a record in table {table} cannot be changed") # Update the record for key, value in recordData.items(): data[i][key] = value # Save the updated table if self._saveTable(table, data): return data[i] else: raise ValueError(f"Error updating record in table {table}") # Record not found raise ValueError(f"Record with ID {recordId} not found in table {table}") def hasInitialId(self, table: str) -> bool: """ Checks if an initial ID is registered for a table. Args: table: Name of the table Returns: True if an initial ID is registered, otherwise False """ systemData = self._loadSystemTable() return table in systemData def getInitialId(self, table: str) -> Optional[int]: """ Returns the initial ID for a table. Args: table: Name of the table Returns: The initial ID or None if not present """ systemData = self._loadSystemTable() initialId = systemData.get(table) logger.debug(f"Database '{self.dbDatabase}': Initial ID for table '{table}' is {initialId}") if initialId is None: logger.debug(f"No initial ID found for table {table}") return initialId def getAllInitialIds(self) -> Dict[str, int]: """ Returns all registered initial IDs. Returns: Dictionary with table names as keys and initial IDs as values """ systemData = self._loadSystemTable() return systemData.copy() # Return a copy to protect the original