import json import os from typing import List, Dict, Any, Optional, TypedDict import logging import uuid from pydantic import BaseModel import threading import time from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) class TableCache(TypedDict): """Type definition for table cache entries""" recordIds: List[str] class DatabaseConnector: """ A connector for JSON-based data storage. Provides generic database operations without user/mandate filtering. Stores tables as folders and records as individual files. """ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, userId: str = None): # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword # Set userId (default to empty string if None) self.userId = userId if userId is not None else "" # Initialize database system self.initDbSystem() # Set up database folder path self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) # Cache for loaded data self._tablesCache: Dict[str, List[Dict[str, Any]]] = {} self._tableMetadataCache: Dict[str, TableCache] = {} # Cache for table metadata (record IDs, etc.) # File locks with timeout protection self._file_locks = {} self._lock_manager = threading.Lock() self._lock_timeouts = {} # Track when locks were acquired # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() logger.debug(f"Context: userId={self.userId}") def initDbSystem(self): """Initialize the database system - creates necessary directories and structure.""" try: # Ensure the database directory exists self.dbFolder = os.path.join(self.dbHost, self.dbDatabase) os.makedirs(self.dbFolder, exist_ok=True) logger.info(f"Database system initialized: {self.dbFolder}") except Exception as e: logger.error(f"Error initializing database system: {e}") raise def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" systemTablePath = self._getTablePath(self._systemTableName) if not os.path.exists(systemTablePath): emptySystemTable = {} self._saveSystemTable(emptySystemTable) logger.info(f"System table initialized in {systemTablePath}") else: # Load existing system table to ensure it's available self._loadSystemTable() logger.debug(f"Existing system table loaded from {systemTablePath}") def _loadSystemTable(self) -> Dict[str, str]: """Loads the system table with the initial IDs.""" # Check if system table is in cache if f"_{self._systemTableName}" in self._tablesCache: return self._tablesCache[f"_{self._systemTableName}"] systemTablePath = self._getTablePath(self._systemTableName) try: if os.path.exists(systemTablePath): with open(systemTablePath, 'r', encoding='utf-8') as f: data = json.load(f) # Store in cache with special prefix to avoid collision with regular tables self._tablesCache[f"_{self._systemTableName}"] = data return data else: self._tablesCache[f"_{self._systemTableName}"] = {} return {} except Exception as e: logger.error(f"Error loading the system table: {e}") self._tablesCache[f"_{self._systemTableName}"] = {} return {} def _saveSystemTable(self, data: Dict[str, str]) -> bool: """Saves the system table with the initial IDs.""" systemTablePath = self._getTablePath(self._systemTableName) try: with open(systemTablePath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Update cache self._tablesCache[f"_{self._systemTableName}"] = data return True except Exception as e: logger.error(f"Error saving the system table: {e}") return False def _getTablePath(self, table: str) -> str: """Returns the full path to a table folder""" return os.path.join(self.dbFolder, table) def _getRecordPath(self, table: str, recordId: str) -> str: """Returns the full path to a record file""" return os.path.join(self._getTablePath(table), f"{recordId}.json") def _get_file_lock(self, filepath: str, timeout_seconds: int = 30): """Get file lock with timeout protection""" with self._lock_manager: if filepath not in self._file_locks: self._file_locks[filepath] = threading.Lock() lock = self._file_locks[filepath] # Check if lock is stale (held too long) if filepath in self._lock_timeouts: lock_age = time.time() - self._lock_timeouts[filepath] if lock_age > timeout_seconds: logger.warning(f"Stale lock detected for {filepath}, age: {lock_age}s") # Force release stale lock try: lock.release() except: pass # Create new lock self._file_locks[filepath] = threading.Lock() lock = self._file_locks[filepath] return lock def _get_table_lock(self, table: str, timeout_seconds: int = 30): """Get table-level lock for metadata operations""" table_lock_key = f"table_{table}" return self._get_file_lock(table_lock_key, timeout_seconds) def _ensureTableDirectory(self, table: str) -> bool: """Ensures the table directory exists.""" if table == self._systemTableName: return True tablePath = self._getTablePath(table) try: os.makedirs(tablePath, exist_ok=True) return True except Exception as e: logger.error(f"Error creating table directory {tablePath}: {e}") return False def _loadTableMetadata(self, table: str) -> Dict[str, Any]: """Loads table metadata (list of record IDs) without loading actual records. NOTE: This method is safe to call without additional locking. """ if table in self._tableMetadataCache: return self._tableMetadataCache[table] # Ensure table directory exists if not self._ensureTableDirectory(table): return {"recordIds": []} tablePath = self._getTablePath(table) metadata = {"recordIds": []} try: if os.path.exists(tablePath): for fileName in os.listdir(tablePath): if fileName.endswith('.json') and fileName != '_metadata.json': recordId = fileName[:-5] # Remove .json extension metadata["recordIds"].append(recordId) metadata["recordIds"].sort() self._tableMetadataCache[table] = metadata except Exception as e: logger.error(f"Error loading table metadata for {table}: {e}") return metadata def _loadRecord(self, table: str, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the table.""" recordPath = self._getRecordPath(table, recordId) try: if os.path.exists(recordPath): with open(recordPath, 'r', encoding='utf-8') as f: record = json.load(f) return record except Exception as e: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None def _saveRecord(self, table: str, recordId: str, record: Dict[str, Any]) -> bool: """Saves a single record to the table with atomic metadata operations.""" recordPath = self._getRecordPath(table, recordId) record_lock = self._get_file_lock(recordPath) table_lock = self._get_table_lock(table) try: # Acquire both locks with timeout - record lock first, then table lock if not record_lock.acquire(timeout=30): raise TimeoutError(f"Could not acquire record lock for {recordPath} within 30 seconds") if not table_lock.acquire(timeout=30): record_lock.release() raise TimeoutError(f"Could not acquire table lock for {table} within 30 seconds") # Record lock acquisition time self._lock_timeouts[recordPath] = time.time() self._lock_timeouts[f"table_{table}"] = time.time() # Ensure table directory exists if not self._ensureTableDirectory(table): raise ValueError(f"Error creating table directory for {table}") # Ensure recordId is a string recordId = str(recordId) # CRITICAL: Ensure record ID matches the file name if "id" in record and str(record["id"]) != recordId: logger.error(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})") raise ValueError(f"Record ID mismatch: file name ID ({recordId}) does not match record ID ({record['id']})") # Add metadata currentTime = getUtcTimestamp() if "_createdAt" not in record: record["_createdAt"] = currentTime record["_createdBy"] = self.userId record["_modifiedAt"] = currentTime record["_modifiedBy"] = self.userId # Save the record file using atomic write tempPath = recordPath + '.tmp' # Ensure directory exists os.makedirs(os.path.dirname(recordPath), exist_ok=True) # Write to temporary file first with open(tempPath, 'w', encoding='utf-8') as f: json.dump(record, f, indent=2, ensure_ascii=False) # Verify the temporary file can be read back (validation) try: with open(tempPath, 'r', encoding='utf-8') as f: json.load(f) # This will fail if file is corrupted except Exception as e: logger.error(f"Validation failed for record {recordId}: {e}") # Clean up temp file if os.path.exists(tempPath): os.remove(tempPath) raise ValueError(f"Record validation failed: {e}") # Atomic move from temp to final location os.replace(tempPath, recordPath) # ATOMIC: Update metadata while holding both locks metadata = self._loadTableMetadata(table) if recordId not in metadata["recordIds"]: metadata["recordIds"].append(recordId) metadata["recordIds"].sort() self._saveTableMetadata(table, metadata) # Update cache if it exists (also protected by table lock) if table in self._tablesCache: # Find and update existing record or append new one found = False for i, existing_record in enumerate(self._tablesCache[table]): if str(existing_record.get("id")) == recordId: self._tablesCache[table][i] = record found = True break if not found: self._tablesCache[table].append(record) return True except Exception as e: logger.error(f"Error saving record {recordId} to table {table}: {e}") # Clean up temp file if it exists tempPath = self._getRecordPath(table, recordId) + '.tmp' if os.path.exists(tempPath): try: os.remove(tempPath) except: pass return False finally: # ALWAYS release both locks, even on error try: if table_lock.locked(): table_lock.release() if f"table_{table}" in self._lock_timeouts: del self._lock_timeouts[f"table_{table}"] except Exception as release_error: logger.error(f"Error releasing table lock for {table}: {release_error}") try: if record_lock.locked(): record_lock.release() if recordPath in self._lock_timeouts: del self._lock_timeouts[recordPath] except Exception as release_error: logger.error(f"Error releasing record lock for {recordPath}: {release_error}") def _loadTable(self, table: str) -> List[Dict[str, Any]]: """Loads all records from a table folder.""" # If the table is the system table, load it directly if table == self._systemTableName: return self._loadSystemTable() # If the table is already in the cache, use the cache if table in self._tablesCache: return self._tablesCache[table] # Load metadata first metadata = self._loadTableMetadata(table) records = [] # Load each record for recordId in metadata["recordIds"]: # Skip metadata file if recordId == "_metadata": continue record = self._loadRecord(table, recordId) if record: records.append(record) self._tablesCache[table] = records return records def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool: """Saves all records to a table folder""" # The system table is handled specially if table == self._systemTableName: return self._saveSystemTable(data) tablePath = self._getTablePath(table) try: # Ensure table directory exists os.makedirs(tablePath, exist_ok=True) # Save each record as a separate file for record in data: if "id" not in record: logger.error(f"Record missing ID in table {table}") continue recordPath = self._getRecordPath(table, record["id"]) with open(recordPath, 'w', encoding='utf-8') as f: json.dump(record, f, indent=2, ensure_ascii=False) # Update the cache self._tablesCache[table] = data logger.debug(f"Successfully saved table {table}") return True except Exception as e: logger.error(f"Error saving table {table}: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error details: {e.__dict__ if hasattr(e, '__dict__') else 'No details available'}") return False def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Applies a record filter to the records""" if not recordFilter: return records filteredRecords = [] for record in records: match = True for field, value in recordFilter.items(): # Check if the field exists if field not in record: match = False break # Convert both values to strings for comparison recordValue = str(record[field]) filterValue = str(value) # Direct string comparison if recordValue != filterValue: match = False break if match: filteredRecords.append(record) return filteredRecords def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success return True # If already present, this is not an error except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID for table {table} removed from system table") return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False def _saveTableMetadata(self, table: str, metadata: Dict[str, Any]) -> bool: """Saves table metadata to a metadata file. NOTE: This method assumes the caller already holds the table lock. """ try: # Create metadata file path metadataPath = os.path.join(self._getTablePath(table), "_metadata.json") # Save metadata (caller should already hold table lock) with open(metadataPath, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) # Update cache self._tableMetadataCache[table] = metadata return True except Exception as e: logger.error(f"Error saving metadata for table {table}: {e}") return False def updateContext(self, userId: str) -> None: """Updates the context of the database connector.""" if userId is None: raise ValueError("userId must be provided") self.userId = userId logger.info(f"Updated database context: userId={self.userId}") # Clear cache to ensure fresh data with new context self._tablesCache = {} self._tableMetadataCache = {} def clearTableCache(self, table: str) -> None: """Clears cache for a specific table to ensure fresh data.""" if table in self._tablesCache: del self._tablesCache[table] logger.debug(f"Cleared cache for table: {table}") if table in self._tableMetadataCache: del self._tableMetadataCache[table] logger.debug(f"Cleared metadata cache for table: {table}") # Public API def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] try: for item in os.listdir(self.dbFolder): itemPath = os.path.join(self.dbFolder, item) if os.path.isdir(itemPath) and not item.startswith('_'): tables.append(item) except Exception as e: logger.error(f"Error reading the database directory: {e}") return tables def getFields(self, table: str) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(table) if not data: return [] fields = list(data[0].keys()) if data else [] return fields def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(table) schema = {} if not data: return schema firstRecord = data[0] for field, value in firstRecord.items(): dataType = type(value).__name__ label = field schema[field] = { "type": dataType, "label": label } return schema def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" # If we have specific record IDs in the filter, only load those records if recordFilter and "id" in recordFilter: recordId = recordFilter["id"] record = self._loadRecord(table, recordId) if record: records = [record] else: return [] else: # Load all records if no specific ID filter records = self._loadTable(table) # Apply recordFilter if available if recordFilter: records = self._applyRecordFilter(records, recordFilter) # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in records: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return records def recordCreate(self, table: str, record: Dict[str, Any]) -> Dict[str, Any]: """Creates a new record in a table.""" # Ensure record has an ID if "id" not in record: record["id"] = str(uuid.uuid4()) # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = record.model_dump() # Save record self._saveRecord(table, record["id"], record) return record def recordModify(self, table: str, recordId: str, record: Dict[str, Any]) -> Dict[str, Any]: """Modifies an existing record in a table.""" # Load existing record existingRecord = self._loadRecord(table, recordId) if not existingRecord: raise ValueError(f"Record {recordId} not found in table {table}") # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = record.model_dump() # CRITICAL: Ensure we never modify the ID if "id" in record and str(record["id"]) != recordId: logger.error(f"Attempted to modify record ID from {recordId} to {record['id']}") raise ValueError("Cannot modify record ID - it must match the file name") # Update existing record with new data existingRecord.update(record) # Save updated record self._saveRecord(table, recordId, existingRecord) return existingRecord def recordDelete(self, table: str, recordId: str) -> bool: """Deletes a record from the table with atomic metadata operations.""" recordPath = self._getRecordPath(table, recordId) record_lock = self._get_file_lock(recordPath) table_lock = self._get_table_lock(table) try: # Acquire both locks with timeout - record lock first, then table lock if not record_lock.acquire(timeout=30): raise TimeoutError(f"Could not acquire record lock for {recordPath} within 30 seconds") if not table_lock.acquire(timeout=30): record_lock.release() raise TimeoutError(f"Could not acquire table lock for {table} within 30 seconds") # Record lock acquisition time self._lock_timeouts[recordPath] = time.time() self._lock_timeouts[f"table_{table}"] = time.time() # Load metadata metadata = self._loadTableMetadata(table) if recordId not in metadata["recordIds"]: return False # Check if it's an initial record initialId = self.getInitialId(table) if initialId is not None and initialId == recordId: self._removeInitialId(table) logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record file if os.path.exists(recordPath): os.remove(recordPath) # ATOMIC: Update metadata while holding both locks metadata["recordIds"].remove(recordId) self._saveTableMetadata(table, metadata) # Update table cache if it exists (also protected by table lock) if table in self._tablesCache: self._tablesCache[table] = [r for r in self._tablesCache[table] if r.get("id") != recordId] return True else: return False except Exception as e: logger.error(f"Error deleting record {recordId} from table {table}: {e}") return False finally: # ALWAYS release both locks, even on error try: if table_lock.locked(): table_lock.release() if f"table_{table}" in self._lock_timeouts: del self._lock_timeouts[f"table_{table}"] except Exception as release_error: logger.error(f"Error releasing table lock for {table}: {release_error}") try: if record_lock.locked(): record_lock.release() if recordPath in self._lock_timeouts: del self._lock_timeouts[recordPath] except Exception as release_error: logger.error(f"Error releasing record lock for {recordPath}: {release_error}") def getInitialId(self, table_or_model) -> Optional[str]: """Returns the initial ID for a table.""" # Handle both string table names (legacy) and model classes (new) if isinstance(table_or_model, str): table = table_or_model else: table = table_or_model.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) logger.debug(f"Initial ID for table '{table}': {initialId}") return initialId