import psycopg2 import psycopg2.extras import json import os import logging from typing import List, Dict, Any, Optional, Union, get_origin, get_args from datetime import datetime import uuid from pydantic import BaseModel import threading import time from modules.shared.attributeUtils import to_dict from modules.shared.timezoneUtils import get_utc_timestamp from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceAppModel import SystemTable logger = logging.getLogger(__name__) # No mapping needed - table name = Pydantic model name exactly def _get_model_fields(model_class) -> Dict[str, str]: """Get all fields from Pydantic model and map to SQL types.""" if not hasattr(model_class, '__fields__'): return {} fields = {} for field_name, field_info in model_class.__fields__.items(): field_type = field_info.type_ # Check for JSONB fields (Dict, List, or complex types) if (field_type == dict or field_type == list or (hasattr(field_type, '__origin__') and field_type.__origin__ in (dict, list)) or field_name in ['execParameters', 'expectedDocumentFormats', 'resultDocuments', 'logs', 'messages', 'stats', 'tasks']): fields[field_name] = 'JSONB' # Simple type mapping elif field_type in (str, type(None)) or (get_origin(field_type) is Union and type(None) in get_args(field_type)): fields[field_name] = 'TEXT' elif field_type == int: fields[field_name] = 'INTEGER' elif field_type == float: fields[field_name] = 'DOUBLE PRECISION' elif field_type == bool: fields[field_name] = 'BOOLEAN' else: fields[field_name] = 'TEXT' # Default to TEXT return fields # No caching needed with proper database class DatabaseConnector: """ A connector for PostgreSQL-based data storage. Provides generic database operations without user/mandate filtering. Uses PostgreSQL with JSONB columns for flexible data storage. """ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, dbPort: int = None, userId: str = None): # DEBUG: Log constructor parameters logger.info(f"DEBUG: DatabaseConnector constructor called with:") logger.info(f" dbHost: '{dbHost}' (type: {type(dbHost)})") logger.info(f" dbDatabase: '{dbDatabase}' (type: {type(dbDatabase)})") logger.info(f" dbUser: '{dbUser}' (type: {type(dbUser)})") logger.info(f" dbPort: {dbPort} (type: {type(dbPort)})") logger.info(f" userId: '{userId}' (type: {type(userId)})") # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword self.dbPort = dbPort # Set userId (default to empty string if None) self.userId = userId if userId is not None else "" # Initialize database system first (creates database if needed) self.connection = None self.initDbSystem() # No caching needed with proper database - PostgreSQL handles performance # Thread safety self._lock = threading.Lock() # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() def initDbSystem(self): """Initialize the database system - creates database and tables.""" try: # Create database if it doesn't exist self._create_database_if_not_exists() # Create tables self._create_tables() # Establish connection to the database self._connect() logger.info("PostgreSQL database system initialized successfully") except Exception as e: logger.error(f"FATAL ERROR: Database system initialization failed: {e}") raise def _create_database_if_not_exists(self): """Create the database if it doesn't exist.""" try: # DEBUG: Log all database configuration values logger.info(f"DEBUG: Database configuration - Host: {self.dbHost}, Database: {self.dbDatabase}, User: {self.dbUser}, Port: {self.dbPort}") logger.info(f"DEBUG: Database name type: {type(self.dbDatabase)}, value: '{self.dbDatabase}'") # Use the configured user for database creation conn = psycopg2.connect( host=self.dbHost, port=self.dbPort, database="postgres", user=self.dbUser, password=self.dbPassword, client_encoding='utf8' ) conn.autocommit = True with conn.cursor() as cursor: # Check if database exists cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", (self.dbDatabase,)) exists = cursor.fetchone() if not exists: # Create database with proper quoting for names with hyphens quoted_db_name = f'"{self.dbDatabase}"' logger.info(f"DEBUG: Creating database with quoted name: {quoted_db_name}") cursor.execute(f"CREATE DATABASE {quoted_db_name}") logger.info(f"Created database: {self.dbDatabase}") else: logger.info(f"Database {self.dbDatabase} already exists") conn.close() except Exception as e: logger.error(f"FATAL ERROR: Cannot create database: {e}") logger.error("Database connection failed - application cannot start") raise RuntimeError(f"FATAL ERROR: Cannot create database '{self.dbDatabase}': {e}") def _create_tables(self): """Create only the system table - application tables are created by interfaces.""" try: # Use the configured user for table creation conn = psycopg2.connect( host=self.dbHost, port=self.dbPort, database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, client_encoding='utf8' ) conn.autocommit = True with conn.cursor() as cursor: # Create only the system table cursor.execute(""" CREATE TABLE IF NOT EXISTS _system ( id SERIAL PRIMARY KEY, table_name VARCHAR(255) UNIQUE NOT NULL, initial_id VARCHAR(255) NOT NULL, _createdAt DOUBLE PRECISION, _modifiedAt DOUBLE PRECISION ) """) logger.info("System table created successfully") conn.close() except Exception as e: logger.error(f"FATAL ERROR: Cannot create system table: {e}") logger.error("Database system table creation failed - application cannot start") raise RuntimeError(f"FATAL ERROR: Cannot create system table: {e}") def _connect(self): """Establish connection to PostgreSQL database.""" try: # Use configured user for main connection with proper parameter handling self.connection = psycopg2.connect( host=self.dbHost, port=self.dbPort, database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, client_encoding='utf8', cursor_factory=psycopg2.extras.RealDictCursor ) self.connection.autocommit = False # Use transactions logger.info(f"Connected to PostgreSQL database: {self.dbDatabase}") except Exception as e: logger.error(f"Failed to connect to PostgreSQL: {e}") raise def _ensure_connection(self): """Ensure database connection is alive, reconnect if necessary.""" try: if self.connection is None or self.connection.closed: self._connect() else: # Test connection with a simple query with self.connection.cursor() as cursor: cursor.execute("SELECT 1") except Exception as e: logger.warning(f"Connection lost, reconnecting: {e}") self._connect() def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" try: # First ensure the system table exists self._ensureTableExists(SystemTable) with self.connection.cursor() as cursor: # Check if system table has any data cursor.execute('SELECT COUNT(*) FROM "_system"') row = cursor.fetchone() count = row['count'] if row else 0 self.connection.commit() except Exception as e: logger.error(f"Error initializing system table: {e}") self.connection.rollback() raise def _loadSystemTable(self) -> Dict[str, str]: """Loads the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: cursor.execute('SELECT "table_name", "initial_id" FROM "_system"') rows = cursor.fetchall() system_data = {} for row in rows: system_data[row['table_name']] = row['initial_id'] return system_data except Exception as e: logger.error(f"Error loading system table: {e}") return {} def _saveSystemTable(self, data: Dict[str, str]) -> bool: """Saves the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: # Clear existing data cursor.execute('DELETE FROM "_system"') # Insert new data for table_name, initial_id in data.items(): cursor.execute(""" INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt") VALUES (%s, %s, %s) """, (table_name, initial_id, get_utc_timestamp())) self.connection.commit() return True except Exception as e: logger.error(f"Error saving system table: {e}") self.connection.rollback() return False def _ensureSystemTableExists(self) -> bool: """Ensures the system table exists, creates it if it doesn't.""" try: self._ensure_connection() with self.connection.cursor() as cursor: # Check if system table exists cursor.execute("SELECT COUNT(*) FROM pg_stat_user_tables WHERE relname = %s", (self._systemTableName,)) exists = cursor.fetchone()['count'] > 0 if not exists: # Create system table cursor.execute(f""" CREATE TABLE "{self._systemTableName}" ( "table_name" VARCHAR(255) PRIMARY KEY, "initial_id" VARCHAR(255), "_createdAt" DOUBLE PRECISION, "_modifiedAt" DOUBLE PRECISION ) """) logger.info("System table created successfully") else: # Check if we need to add missing columns to existing table cursor.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = %s AND table_schema = 'public' """, (self._systemTableName,)) existing_columns = [row['column_name'] for row in cursor.fetchall()] if '_modifiedAt' not in existing_columns: cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION') logger.info("Added _modifiedAt column to existing system table") return True except Exception as e: logger.error(f"Error ensuring system table exists: {e}") return False def _ensureTableExists(self, model_class: type) -> bool: """Ensures a table exists, creates it if it doesn't.""" table = model_class.__name__ if table == "SystemTable": # Handle system table specially - it uses _system as the actual table name return self._ensureSystemTableExists() try: self._ensure_connection() with self.connection.cursor() as cursor: # Check if table exists by querying information_schema with case-insensitive search cursor.execute(''' SELECT COUNT(*) FROM information_schema.tables WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public' ''', (table,)) exists = cursor.fetchone()['count'] > 0 if not exists: # Create table from Pydantic model self._create_table_from_model(cursor, table, model_class) logger.info(f"Created table '{table}' with columns from Pydantic model") self.connection.commit() return True except Exception as e: logger.error(f"Error ensuring table {table} exists: {e}") if hasattr(self, 'connection') and self.connection: self.connection.rollback() return False def _create_table_from_model(self, cursor, table: str, model_class: type) -> None: """Create table with columns matching Pydantic model fields.""" fields = _get_model_fields(model_class) # Build column definitions with quoted identifiers to preserve exact case columns = ['"id" VARCHAR(255) PRIMARY KEY'] for field_name, sql_type in fields.items(): if field_name != 'id': # Skip id, already defined columns.append(f'"{field_name}" {sql_type}') # Add metadata columns columns.extend([ '"_createdAt" DOUBLE PRECISION', '"_modifiedAt" DOUBLE PRECISION', '"_createdBy" VARCHAR(255)', '"_modifiedBy" VARCHAR(255)' ]) # Create table sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})' cursor.execute(sql) # Create indexes for foreign keys for field_name in fields: if field_name.endswith('Id') and field_name != 'id': cursor.execute(f'CREATE INDEX IF NOT EXISTS "idx_{table}_{field_name}" ON "{table}" ("{field_name}")') def _save_record(self, cursor, table: str, recordId: str, record: Dict[str, Any], model_class: type) -> None: """Save record to normalized table with explicit columns.""" # Get columns from Pydantic model instead of database schema fields = _get_model_fields(model_class) columns = ['id'] + [field for field in fields.keys() if field != 'id'] + ['_createdAt', '_createdBy', '_modifiedAt', '_modifiedBy'] if not columns: logger.error(f"No columns found for table {table}") return # Filter record data to only include columns that exist in the table filtered_record = {k: v for k, v in record.items() if k in columns} # Ensure id is set filtered_record['id'] = recordId # Prepare values in the correct order values = [] for col in columns: value = filtered_record.get(col) # Handle timestamp fields - store as Unix timestamps (floats) for consistency if col in ['_createdAt', '_modifiedAt'] and value is not None: if isinstance(value, str): # Try to parse string as timestamp try: value = float(value) except: pass # Keep as string if parsing fails # Convert enum values to their string representation elif hasattr(value, 'value'): value = value.value # Handle JSONB fields - ensure proper JSON format for PostgreSQL elif col in fields and fields[col] == 'JSONB' and value is not None: import json if isinstance(value, (dict, list)): # Convert Python objects to JSON string for PostgreSQL JSONB value = json.dumps(value) elif isinstance(value, str): # Validate that it's valid JSON, if not, try to parse and re-serialize try: # Test if it's already valid JSON json.loads(value) # If successful, keep as is pass except (json.JSONDecodeError, TypeError): # If not valid JSON, convert to JSON string value = json.dumps(value) else: # Convert other types to JSON value = json.dumps(value) values.append(value) # Build INSERT/UPDATE with quoted identifiers col_names = ', '.join([f'"{col}"' for col in columns]) placeholders = ', '.join(['%s'] * len(columns)) updates = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ['_createdAt', '_createdBy']]) sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}' cursor.execute(sql, values) def _loadRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the normalized table.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return None with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" WHERE "id" = %s', (recordId,)) row = cursor.fetchone() if not row: return None # Convert row to dict and handle JSONB fields record = dict(row) fields = _get_model_fields(model_class) # Parse JSONB fields back to Python objects for field_name, field_type in fields.items(): if field_type == 'JSONB' and field_name in record and record[field_name] is not None: import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads(record[field_name]) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads(str(record[field_name])) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") pass return record except Exception as e: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None def _saveRecord(self, model_class: type, recordId: str, record: Dict[str, Any]) -> bool: """Saves a single record to the table.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return False recordId = str(recordId) if "id" in record and str(record["id"]) != recordId: raise ValueError(f"Record ID mismatch: {recordId} != {record['id']}") # Add metadata currentTime = get_utc_timestamp() if "_createdAt" not in record: record["_createdAt"] = currentTime record["_createdBy"] = self.userId record["_modifiedAt"] = currentTime record["_modifiedBy"] = self.userId with self.connection.cursor() as cursor: self._save_record(cursor, table, recordId, record, model_class) self.connection.commit() return True except Exception as e: logger.error(f"Error saving record {recordId} to table {table}: {e}") self.connection.rollback() return False def _loadTable(self, model_class: type) -> List[Dict[str, Any]]: """Loads all records from a normalized table.""" table = model_class.__name__ if table == self._systemTableName: return self._loadSystemTable() try: if not self._ensureTableExists(model_class): return [] with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" ORDER BY "id"') records = [dict(row) for row in cursor.fetchall()] # Handle JSONB fields for all records fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): if field_type == 'JSONB' and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: record[field_name] = [] elif field_name in ['execParameters', 'stats']: record[field_name] = {} else: record[field_name] = None else: import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads(record[field_name]) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads(str(record[field_name])) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") pass return records except Exception as e: logger.error(f"Error loading table {table}: {e}") return [] def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success else: # Check if the existing initial ID still exists in the table existingInitialId = systemData[table] records = self.getRecordset(model_class, recordFilter={"id": existingInitialId}) if not records: # The initial record no longer exists, update to the new one systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID updated from {existingInitialId} to {initialId} for table {table}") return success else: return True except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID for table {table} removed from system table") return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False def updateContext(self, userId: str) -> None: """Updates the context of the database connector.""" if userId is None: raise ValueError("userId must be provided") self.userId = userId logger.info(f"Updated database context: userId={self.userId}") # No cache to clear - database handles data consistency def clearTableCache(self, model_class: type) -> None: """No-op: Database handles data consistency automatically.""" # No caching with proper database - PostgreSQL handles consistency pass # Public API def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] try: with self.connection.cursor() as cursor: cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name NOT LIKE '_%' ORDER BY table_name """) rows = cursor.fetchall() tables = [row['table_name'] for row in rows] except Exception as e: logger.error(f"Error reading the database: {e}") return tables def getFields(self, model_class: type) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(model_class) if not data: return [] fields = list(data[0].keys()) if data else [] return fields def getSchema(self, model_class: type, language: str = None) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(model_class) schema = {} if not data: return schema firstRecord = data[0] for field, value in firstRecord.items(): dataType = type(value).__name__ label = field schema[field] = { "type": dataType, "label": label } return schema def getRecordset(self, model_class: type, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return [] # Build WHERE clause from recordFilter where_conditions = [] where_values = [] if recordFilter: for field, value in recordFilter.items(): where_conditions.append(f'"{field}" = %s') where_values.append(value) # Build the query if where_conditions: where_clause = " WHERE " + " AND ".join(where_conditions) else: where_clause = "" query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"' with self.connection.cursor() as cursor: cursor.execute(query, where_values) records = [dict(row) for row in cursor.fetchall()] # Handle JSONB fields for all records fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): if field_type == 'JSONB' and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: record[field_name] = [] elif field_name in ['execParameters', 'stats']: record[field_name] = {} else: record[field_name] = None else: import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads(record[field_name]) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads(str(record[field_name])) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") pass # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in records: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return records except Exception as e: logger.error(f"Error loading records from table {table}: {e}") return [] def recordCreate(self, model_class: type, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]: """Creates a new record in a table based on Pydantic model class.""" # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = to_dict(record) elif isinstance(record, dict): record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") # Ensure record has an ID if "id" not in record: record["id"] = str(uuid.uuid4()) # Save record self._saveRecord(model_class, record["id"], record) # Check if this is the first record in the table and register as initial ID table = model_class.__name__ existingInitialId = self.getInitialId(model_class) if existingInitialId is None: # This is the first record, register it as the initial ID self._registerInitialId(table, record["id"]) logger.info(f"Registered initial ID {record['id']} for table {table}") return record def recordModify(self, model_class: type, recordId: str, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]: """Modifies an existing record in a table based on Pydantic model class.""" # Load existing record existingRecord = self._loadRecord(model_class, recordId) if not existingRecord: table = model_class.__name__ raise ValueError(f"Record {recordId} not found in table {table}") # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = to_dict(record) elif isinstance(record, dict): record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") # CRITICAL: Ensure we never modify the ID if "id" in record and str(record["id"]) != recordId: logger.error(f"Attempted to modify record ID from {recordId} to {record['id']}") raise ValueError("Cannot modify record ID - it must match the provided recordId") # Update existing record with new data existingRecord.update(record) # Save updated record self._saveRecord(model_class, recordId, existingRecord) return existingRecord def recordDelete(self, model_class: type, recordId: str) -> bool: """Deletes a record from the table based on Pydantic model class.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return False with self.connection.cursor() as cursor: # Check if record exists cursor.execute(f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,)) if not cursor.fetchone(): return False # Check if it's an initial record initialId = self.getInitialId(model_class) if initialId is not None and initialId == recordId: self._removeInitialId(table) logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,)) # No cache to update - database handles consistency self.connection.commit() return True except Exception as e: logger.error(f"Error deleting record {recordId} from table {table}: {e}") self.connection.rollback() return False def getInitialId(self, model_class: type) -> Optional[str]: """Returns the initial ID for a table.""" table = model_class.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) return initialId def close(self): """Close the database connection.""" if hasattr(self, 'connection') and self.connection and not self.connection.closed: self.connection.close() def __del__(self): """Cleanup method to close connection.""" try: self.close() except Exception: # Ignore errors during cleanup pass