import json import os from typing import List, Dict, Any, Optional, Union, TypedDict import logging from datetime import datetime import uuid from pydantic import BaseModel import threading import time from modules.shared.attributeUtils import to_dict from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) class TableCache(TypedDict): """Type definition for table cache entries""" recordIds: List[str] class DatabaseConnector: """ A connector for JSON-based data storage. Provides generic database operations without user/mandate filtering. Stores tables as folders and records as individual files. """ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, userId: str = None): # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword # Set userId (default to empty string if None) self.userId = userId if userId is not None else "" # Initialize database system self.initDbSystem() # Set up database folder path self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) # Cache for loaded data self._tablesCache: Dict[str, List[Dict[str, Any]]] = {} self._tableMetadataCache: Dict[str, TableCache] = {} # Cache for table metadata (record IDs, etc.) # File locks with timeout protection self._file_locks = {} self._lock_manager = threading.Lock() self._lock_timeouts = {} # Track when locks were acquired # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() logger.debug(f"Context: userId={self.userId}") def initDbSystem(self): """Initialize the database system - creates necessary directories and structure.""" try: # Ensure the database directory exists self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) os.makedirs(self.dbFolder, exist_ok=True) logger.info(f"Database system initialized: {self.dbFolder}") except Exception as e: logger.error(f"Error initializing database system: {e}") raise def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" systemTablePath = self._getTablePath(self._systemTableName) if not os.path.exists(systemTablePath): emptySystemTable = {} self._saveSystemTable(emptySystemTable) logger.info(f"System table initialized in {systemTablePath}") else: # Load existing system table to ensure it's available self._loadSystemTable() logger.debug(f"Existing system table loaded from {systemTablePath}") def _loadSystemTable(self) -> Dict[str, str]: """Loads the system table with the initial IDs.""" # Check if system table is in cache if f"_{self._systemTableName}" in self._tablesCache: return self._tablesCache[f"_{self._systemTableName}"] systemTablePath = self._getTablePath(self._systemTableName) try: if os.path.exists(systemTablePath): with open(systemTablePath, 'r', encoding='utf-8') as f: data = json.load(f) # Store in cache with special prefix to avoid collision with regular tables self._tablesCache[f"_{self._systemTableName}"] = data return data else: self._tablesCache[f"_{self._systemTableName}"] = {} return {} except Exception as e: logger.error(f"Error loading the system table: {e}") self._tablesCache[f"_{self._systemTableName}"] = {} return {} def _saveSystemTable(self, data: Dict[str, str]) -> bool: """Saves the system table with the initial IDs.""" systemTablePath = self._getTablePath(self._systemTableName) try: with open(systemTablePath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update cache self._tablesCache[f"_{self._systemTableName}"] = data return True except Exception as e: logger.error(f"Error saving the system table: {e}") return False def _getTablePath(self, table: str) -> str: """Returns the full path to a table folder""" return os.path.join(self.dbFolder, table) def _getRecordPath(self, table: str, recordId: str) -> str: """Returns the full path to a record file""" return os.path.join(self._getTablePath(table), f"{recordId}.json") def _get_file_lock(self, filepath: str, timeout_seconds: int = 30): """Get file lock with timeout protection""" with self._lock_manager: if filepath not in self._file_locks: self._file_locks[filepath] = threading.Lock() lock = self._file_locks[filepath] # Check if lock is stale (held too long) if filepath in self._lock_timeouts: lock_age = time.time() - self._lock_timeouts[filepath] if lock_age > timeout_seconds: logger.warning(f"Stale lock detected for {filepath}, age: {lock_age}s") # Force release stale lock try: lock.release() except: pass # Create new lock self._file_locks[filepath] = threading.Lock() lock = self._file_locks[filepath] return lock def _get_table_lock(self, table: str, timeout_seconds: int = 30): """Get table-level lock for metadata operations""" table_lock_key = f"table_{table}" return self._get_file_lock(table_lock_key, timeout_seconds) def _ensureTableDirectory(self, table: str) -> bool: """Ensures the table directory exists.""" if table == self._systemTableName: return True tablePath = self._getTablePath(table) try: os.makedirs(tablePath, exist_ok=True) return True except Exception as e: logger.error(f"Error creating table directory {tablePath}: {e}") return False def _loadTableMetadata(self, table: str) -> Dict[str, Any]: """Loads table metadata (list of record IDs) without loading actual records. NOTE: This method is safe to call without additional locking. """ if table in self._tableMetadataCache: return self._tableMetadataCache[table] # Ensure table directory exists if not self._ensureTableDirectory(table): return {"recordIds": []} tablePath = self._getTablePath(table) metadata = {"recordIds": []} try: if os.path.exists(tablePath): for fileName in os.listdir(tablePath): if fileName.endswith('.json') and fileName != '_metadata.json': recordId = fileName[:-5] # Remove .json extension metadata["recordIds"].append(recordId) metadata["recordIds"].sort() self._tableMetadataCache[table] = metadata except Exception as e: logger.error(f"Error loading table metadata for {table}: {e}") return metadata def _loadRecord(self, table: str, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the table.""" recordPath = self._getRecordPath(table, recordId) try: if os.path.exists(recordPath): with open(recordPath, 'r', encoding='utf-8') as f: record = json.load(f) return record except Exception as e: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None def _saveRecord(self, table: str, recordId: str, record: Dict[str, Any]) -> bool: """Saves a single record to the table with atomic metadata operations.""" recordPath = self._getRecordPath(table, recordId) record_lock = self._get_file_lock(recordPath) table_lock = self._get_table_lock(table) try: # Acquire both locks with timeout - record lock first, then table lock if not record_lock.acquire(timeout=30): raise TimeoutError(f"Could not acquire record lock for {recordPath} within 30 seconds") if not table_lock.acquire(timeout=30): record_lock.release() raise TimeoutError(f"Could not acquire table lock for {table} within 30 seconds") # Record lock acquisition time self._lock_timeouts[recordPath] = time.time() self._lock_timeouts[f"table_{table}"] = time.time() # Ensure table directory exists if not self._ensureTableDirectory(table): raise ValueError(f"Error creating table directory for {table}") # Ensure recordId is a string recordId = str(recordId) # CRITICAL: Ensure record ID matches the file name if "id" in record and str(record["id"]) != recordId: logger.error(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})") raise ValueError(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})") # Add metadata currentTime = get_utc_timestamp() if "_createdAt" not in record: record["_createdAt"] = currentTime record["_createdBy"] = self.userId record["_modifiedAt"] = currentTime record["_modifiedBy"] = self.userId # Save the record file using atomic write tempPath = recordPath + '.tmp' # Ensure directory exists os.makedirs(os.path.dirname(recordPath), exist_ok=True) # Write to temporary file first with open(tempPath, 'w', encoding='utf-8') as f: json.dump(record, f, indent=2, ensure_ascii=False) # Verify the temporary file can be read back (validation) try: with open(tempPath, 'r', encoding='utf-8') as f: json.load(f) # This will fail if file is corrupted except Exception as e: logger.error(f"Validation failed for record {recordId}: {e}") # Clean up temp file if os.path.exists(tempPath): os.remove(tempPath) raise ValueError(f"Record validation failed: {e}") # Atomic move from temp to final location os.replace(tempPath, recordPath) # ATOMIC: Update metadata while holding both locks metadata = self._loadTableMetadata(table) if recordId not in metadata["recordIds"]: metadata["recordIds"].append(recordId) metadata["recordIds"].sort() self._saveTableMetadata(table, metadata) # Update cache if it exists (also protected by table lock) if table in self._tablesCache: # Find and update existing record or append new one found = False for i, existing_record in enumerate(self._tablesCache[table]): if str(existing_record.get("id")) == recordId: self._tablesCache[table][i] = record found = True break if not found: self._tablesCache[table].append(record) return True except Exception as e: logger.error(f"Error saving record {recordId} to table {table}: {e}") # Clean up temp file if it exists tempPath = self._getRecordPath(table, recordId) + '.tmp' if os.path.exists(tempPath): try: os.remove(tempPath) except: pass return False finally: # ALWAYS release both locks, even on error try: if table_lock.locked(): table_lock.release() if f"table_{table}" in self._lock_timeouts: del self._lock_timeouts[f"table_{table}"] except Exception as release_error: logger.error(f"Error releasing table lock for {table}: {release_error}") try: if record_lock.locked(): record_lock.release() if recordPath in self._lock_timeouts: del self._lock_timeouts[recordPath] except Exception as release_error: logger.error(f"Error releasing record lock for {recordPath}: {release_error}") def _loadTable(self, table: str) -> List[Dict[str, Any]]: """Loads all records from a table folder.""" # If the table is the system table, load it directly if table == self._systemTableName: return self._loadSystemTable() # If the table is already in the cache, use the cache if table in self._tablesCache: return self._tablesCache[table] # Load metadata first metadata = self._loadTableMetadata(table) records = [] # Load each record for recordId in metadata["recordIds"]: # Skip metadata file if recordId == "_metadata": continue record = self._loadRecord(table, recordId) if record: records.append(record) self._tablesCache[table] = records return records def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool: """Saves all records to a table folder""" # The system table is handled specially if table == self._systemTableName: return self._saveSystemTable(data) tablePath = self._getTablePath(table) try: # Ensure table directory exists os.makedirs(tablePath, exist_ok=True) # Save each record as a separate file for record in data: if "id" not in record: logger.error(f"Record missing ID in table {table}") continue recordPath = self._getRecordPath(table, record["id"]) with open(recordPath, 'w', encoding='utf-8') as f: json.dump(record, f, indent=2, ensure_ascii=False) # Update the cache self._tablesCache[table] = data logger.debug(f"Successfully saved table {table}") return True except Exception as e: logger.error(f"Error saving table {table}: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error details: {e.__dict__ if hasattr(e, '__dict__') else 'No details available'}") return False def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Applies a record filter to the records""" if not recordFilter: return records filteredRecords = [] for record in records: match = True for field, value in recordFilter.items(): # Check if the field exists if field not in record: match = False break # Convert both values to strings for comparison recordValue = str(record[field]) filterValue = str(value) # Direct string comparison if recordValue != filterValue: match = False break if match: filteredRecords.append(record) return filteredRecords def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success return True # If already present, this is not an error except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID for table {table} removed from system table") return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False def _saveTableMetadata(self, table: str, metadata: Dict[str, Any]) -> bool: """Saves table metadata to a metadata file. NOTE: This method assumes the caller already holds the table lock. """ try: # Create metadata file path metadataPath = os.path.join(self._getTablePath(table), "_metadata.json") # Save metadata (caller should already hold table lock) with open(metadataPath, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) # Update cache self._tableMetadataCache[table] = metadata return True except Exception as e: logger.error(f"Error saving metadata for table {table}: {e}") return False def updateContext(self, userId: str) -> None: """Updates the context of the database connector.""" if userId is None: raise ValueError("userId must be provided") self.userId = userId logger.info(f"Updated database context: userId={self.userId}") # Clear cache to ensure fresh data with new context self._tablesCache = {} self._tableMetadataCache = {} def clearTableCache(self, table: str) -> None: """Clears cache for a specific table to ensure fresh data.""" if table in self._tablesCache: del self._tablesCache[table] logger.debug(f"Cleared cache for table: {table}") if table in self._tableMetadataCache: del self._tableMetadataCache[table] logger.debug(f"Cleared metadata cache for table: {table}") # Public API def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] try: for item in os.listdir(self.dbFolder): itemPath = os.path.join(self.dbFolder, item) if os.path.isdir(itemPath) and not item.startswith('_'): tables.append(item) except Exception as e: logger.error(f"Error reading the database directory: {e}") return tables def getFields(self, table: str) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(table) if not data: return [] fields = list(data[0].keys()) if data else [] return fields def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(table) schema = {} if not data: return schema firstRecord = data[0] for field, value in firstRecord.items(): dataType = type(value).__name__ label = field schema[field] = { "type": dataType, "label": label } return schema def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" # If we have specific record IDs in the filter, only load those records if recordFilter and "id" in recordFilter: recordId = recordFilter["id"] record = self._loadRecord(table, recordId) if record: records = [record] else: return [] else: # Load all records if no specific ID filter records = self._loadTable(table) # Apply recordFilter if available if recordFilter: records = self._applyRecordFilter(records, recordFilter) # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in records: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return records def recordCreate(self, table: str, record: Dict[str, Any]) -> Dict[str, Any]: """Creates a new record in a table.""" # Ensure record has an ID if "id" not in record: record["id"] = str(uuid.uuid4()) # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = to_dict(record) # Save record self._saveRecord(table, record["id"], record) return record def recordModify(self, table: str, recordId: str, record: Dict[str, Any]) -> Dict[str, Any]: """Modifies an existing record in a table.""" # Load existing record existingRecord = self._loadRecord(table, recordId) if not existingRecord: raise ValueError(f"Record {recordId} not found in table {table}") # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = to_dict(record) # CRITICAL: Ensure we never modify the ID if "id" in record and str(record["id"]) != recordId: logger.error(f"Attempted to modify record ID from {recordId} to {record['id']}") raise ValueError("Cannot modify record ID - it must match the file name") # Update existing record with new data existingRecord.update(record) # Save updated record self._saveRecord(table, recordId, existingRecord) return existingRecord def recordDelete(self, table: str, recordId: str) -> bool: """Deletes a record from the table with atomic metadata operations.""" recordPath = self._getRecordPath(table, recordId) record_lock = self._get_file_lock(recordPath) table_lock = self._get_table_lock(table) try: # Acquire both locks with timeout - record lock first, then table lock if not record_lock.acquire(timeout=30): raise TimeoutError(f"Could not acquire record lock for {recordPath} within 30 seconds") if not table_lock.acquire(timeout=30): record_lock.release() raise TimeoutError(f"Could not acquire table lock for {table} within 30 seconds") # Record lock acquisition time self._lock_timeouts[recordPath] = time.time() self._lock_timeouts[f"table_{table}"] = time.time() # Load metadata metadata = self._loadTableMetadata(table) if recordId not in metadata["recordIds"]: return False # Check if it's an initial record initialId = self.getInitialId(table) if initialId is not None and initialId == recordId: self._removeInitialId(table) logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record file if os.path.exists(recordPath): os.remove(recordPath) # ATOMIC: Update metadata while holding both locks metadata["recordIds"].remove(recordId) self._saveTableMetadata(table, metadata) # Update table cache if it exists (also protected by table lock) if table in self._tablesCache: self._tablesCache[table] = [r for r in self._tablesCache[table] if r.get("id") != recordId] return True else: return False except Exception as e: logger.error(f"Error deleting record {recordId} from table {table}: {e}") return False finally: # ALWAYS release both locks, even on error try: if table_lock.locked(): table_lock.release() if f"table_{table}" in self._lock_timeouts: del self._lock_timeouts[f"table_{table}"] except Exception as release_error: logger.error(f"Error releasing table lock for {table}: {release_error}") try: if record_lock.locked(): record_lock.release() if recordPath in self._lock_timeouts: del self._lock_timeouts[recordPath] except Exception as release_error: logger.error(f"Error releasing record lock for {recordPath}: {release_error}") def getInitialId(self, table_or_model) -> Optional[str]: """Returns the initial ID for a table.""" # Handle both string table names (legacy) and model classes (new) if isinstance(table_or_model, str): table = table_or_model else: table = table_or_model.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) logger.debug(f"Initial ID for table '{table}': {initialId}") return initialId