import json import os from typing import List, Dict, Any, Optional, Union import logging logger = logging.getLogger(__name__) class DatabaseConnector: """ A connector for JSON-based data storage. Provides generic database operations without user/mandate filtering. """ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, mandateId: int = None, userId: int = None, skipInitialIdLookup: bool = False): # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword self.skipInitialIdLookup = skipInitialIdLookup # Check if context parameters are set if mandateId is None or userId is None: raise ValueError("mandateId and userId must be set") # Ensure the database directory exists self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) os.makedirs(self.dbFolder, exist_ok=True) # Cache for loaded data self._tablesCache = {} # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() # Temporarily store mandateId and userId self._mandateId = mandateId self._userId = userId # If mandateId or userId are 0 and we're not skipping ID lookup, try to use the initial IDs if not skipInitialIdLookup: if mandateId == 0: initialMandateId = self.getInitialId("mandates") if initialMandateId is not None: self._mandateId = initialMandateId logger.info(f"Using initial mandateId: {initialMandateId} instead of 0") if userId == 0: initialUserId = self.getInitialId("users") if initialUserId is not None: self._userId = initialUserId logger.info(f"Using initial userId: {initialUserId} instead of 0") # Set the effective IDs as properties self.mandateId = self._mandateId self.userId = self._userId logger.debug(f"Context: mandateId={self.mandateId}, userId={self.userId}") def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" systemTablePath = self._getTablePath(self._systemTableName) if not os.path.exists(systemTablePath): emptySystemTable = {} self._saveSystemTable(emptySystemTable) logger.info(f"System table initialized in {systemTablePath}") else: # Load existing system table to ensure it's available self._loadSystemTable() logger.debug(f"Existing system table loaded from {systemTablePath}") def _loadSystemTable(self) -> Dict[str, int]: """Loads the system table with the initial IDs.""" # Check if system table is in cache if f"_{self._systemTableName}" in self._tablesCache: return self._tablesCache[f"_{self._systemTableName}"] systemTablePath = self._getTablePath(self._systemTableName) try: if os.path.exists(systemTablePath): with open(systemTablePath, 'r', encoding='utf-8') as f: data = json.load(f) # Store in cache with special prefix to avoid collision with regular tables self._tablesCache[f"_{self._systemTableName}"] = data return data else: self._tablesCache[f"_{self._systemTableName}"] = {} return {} except Exception as e: logger.error(f"Error loading the system table: {e}") self._tablesCache[f"_{self._systemTableName}"] = {} return {} def _saveSystemTable(self, data: Dict[str, int]) -> bool: """Saves the system table with the initial IDs.""" systemTablePath = self._getTablePath(self._systemTableName) try: with open(systemTablePath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update cache self._tablesCache[f"_{self._systemTableName}"] = data return True except Exception as e: logger.error(f"Error saving the system table: {e}") return False def _getTablePath(self, table: str) -> str: """Returns the full path to a table file""" return os.path.join(self.dbFolder, f"{table}.json") def _loadTable(self, table: str) -> List[Dict[str, Any]]: """Loads a table from the corresponding JSON file""" path = self._getTablePath(table) # If the table is the system table, load it directly if table == self._systemTableName: return [] # The system table is not treated like normal tables # If the table is already in the cache, use the cache if table in self._tablesCache: return self._tablesCache[table] # Otherwise load the file try: if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: data = json.load(f) self._tablesCache[table] = data # If data was loaded and no initial ID is registered yet, # register the ID of the first record (if available) if data and not self.hasInitialId(table): if "id" in data[0]: self._registerInitialId(table, data[0]["id"]) logger.info(f"Initial ID {data[0]['id']} for table {table} retroactively registered") return data else: # If the file doesn't exist, create an empty table logger.info(f"New table {table}") self._tablesCache[table] = [] self._saveTable(table, []) return [] except Exception as e: logger.error(f"Error loading table {table}: {e}") return [] def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool: """Saves a table to the corresponding JSON file""" # The system table is handled specially if table == self._systemTableName: return False path = self._getTablePath(table) try: with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update the cache self._tablesCache[table] = data return True except Exception as e: logger.error(f"Error saving table {table}: {e}") return False def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Applies a record filter to the records""" if not recordFilter: return records filteredRecords = [] for record in records: match = True for field, value in recordFilter.items(): # Check if the field exists if field not in record: match = False break # Handle type conversion for integer comparisons both ways if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit(): # Filter value is int, record value is string if value != int(record[field]): match = False break elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int): # Filter value is string, record value is int if record[field] != int(value): match = False break # Otherwise direct comparison elif record[field] != value: match = False break if match: filteredRecords.append(record) return filteredRecords def _registerInitialId(self, table: str, initialId: int) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success return True # If already present, this is not an error except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID for table {table} removed from system table") return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False # Public API def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] try: for filename in os.listdir(self.dbFolder): if filename.endswith('.json') and not filename.startswith('_'): tableName = filename[:-5] # Remove the .json extension tables.append(tableName) except Exception as e: logger.error(f"Error reading the database directory: {e}") return tables def getFields(self, table: str) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(table) if not data: return [] fields = list(data[0].keys()) if data else [] return fields def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(table) schema = {} if not data: return schema firstRecord = data[0] for field, value in firstRecord.items(): dataType = type(value).__name__ label = field schema[field] = { "type": dataType, "label": label } return schema def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" data = self._loadTable(table) # Apply recordFilter if available if recordFilter: data = self._applyRecordFilter(data, recordFilter) # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in data: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return data def recordCreate(self, table: str, recordData: Dict[str, Any]) -> Dict[str, Any]: """Creates a new record in the table.""" data = self._loadTable(table) # Add mandateId and userId if not present if "mandateId" not in recordData or recordData["mandateId"] == 0: recordData["mandateId"] = self.mandateId if "userId" not in recordData or recordData["userId"] == 0: recordData["userId"] = self.userId # Determine the next ID if not present if "id" not in recordData: nextId = 1 if data: nextId = max(record["id"] for record in data if "id" in record) + 1 recordData["id"] = nextId # If the table is empty and a system ID should be registered if not data: self._registerInitialId(table, recordData["id"]) logger.info(f"Initial ID {recordData['id']} for table {table} has been registered") # Add the new record data.append(recordData) # Save the updated table if self._saveTable(table, data): return recordData else: raise ValueError(f"Error creating the record in table {table}") def recordDelete(self, table: str, recordId: Union[str, int]) -> bool: """Deletes a record from the table.""" data = self._loadTable(table) # Search for the record for i, record in enumerate(data): if "id" in record and record["id"] == recordId: # Check if it's an initial record initialId = self.getInitialId(table) if initialId is not None and initialId == recordId: self._removeInitialId(table) logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record del data[i] # Save the updated table return self._saveTable(table, data) # Record not found return False def recordModify(self, table: str, recordId: Union[str, int], recordData: Dict[str, Any]) -> Dict[str, Any]: """Modifies a record in the table.""" data = self._loadTable(table) # Search for the record for i, record in enumerate(data): if "id" in record and record["id"] == recordId: # Prevent changing the ID if "id" in recordData and recordData["id"] != recordId: raise ValueError(f"The ID of a record in table {table} cannot be changed") # Update the record for key, value in recordData.items(): data[i][key] = value # Save the updated table if self._saveTable(table, data): return data[i] else: raise ValueError(f"Error updating record in table {table}") # Record not found raise ValueError(f"Record with ID {recordId} not found in table {table}") def hasInitialId(self, table: str) -> bool: """Checks if an initial ID is registered for a table.""" systemData = self._loadSystemTable() return table in systemData def getInitialId(self, table: str) -> Optional[int]: """Returns the initial ID for a table.""" systemData = self._loadSystemTable() initialId = systemData.get(table) logger.debug(f"Database '{self.dbDatabase}': Initial ID for table '{table}' is {initialId}") if initialId is None: logger.debug(f"No initial ID found for table {table}") return initialId def getAllInitialIds(self) -> Dict[str, int]: """Returns all registered initial IDs.""" systemData = self._loadSystemTable() return systemData.copy() # Return a copy to protect the original