From 37f01a215603e50a85fb2c543d76afcbeea5ed1d Mon Sep 17 00:00:00 2001 From: Christopher Gondek Date: Fri, 3 Oct 2025 16:51:43 +0200 Subject: [PATCH] chore: fix pydantic v2 issue --- modules/connectors/connectorDbPostgre.py | 589 ++++++++++++++--------- 1 file changed, 362 insertions(+), 227 deletions(-) diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py index c17fa2c3..236cb796 100644 --- a/modules/connectors/connectorDbPostgre.py +++ b/modules/connectors/connectorDbPostgre.py @@ -18,101 +18,140 @@ logger = logging.getLogger(__name__) # No mapping needed - table name = Pydantic model name exactly + class SystemTable(BaseModel, ModelMixin): """Data model for system table entries""" + table_name: str = Field( description="Name of the table", frontend_type="text", frontend_readonly=True, - frontend_required=True + frontend_required=True, ) initial_id: Optional[str] = Field( default=None, description="Initial ID for the table", frontend_type="text", frontend_readonly=True, - frontend_required=False + frontend_required=False, ) + def _get_model_fields(model_class) -> Dict[str, str]: """Get all fields from Pydantic model and map to SQL types.""" - if not hasattr(model_class, '__fields__'): + # Pydantic v2 uses model_fields instead of __fields__ + if hasattr(model_class, "model_fields"): + model_fields = model_class.model_fields + elif hasattr(model_class, "__fields__"): + model_fields = model_class.__fields__ + else: return {} - + fields = {} - for field_name, field_info in model_class.__fields__.items(): - field_type = field_info.type_ - + for field_name, field_info in model_fields.items(): + # Pydantic v2 uses annotation instead of type_ + field_type = ( + field_info.annotation + if hasattr(field_info, "annotation") + else field_info.type_ + ) + # Check for JSONB fields (Dict, List, or complex types) - if (field_type == dict or - field_type == list or - (hasattr(field_type, '__origin__') and field_type.__origin__ in (dict, list)) or - field_name in ['execParameters', 'expectedDocumentFormats', 'resultDocuments', 'logs', 'messages', 'stats', 'tasks']): - fields[field_name] = 'JSONB' + if ( + field_type == dict + or field_type == list + or ( + hasattr(field_type, "__origin__") + and field_type.__origin__ in (dict, list) + ) + or field_name + in [ + "execParameters", + "expectedDocumentFormats", + "resultDocuments", + "logs", + "messages", + "stats", + "tasks", + ] + ): + fields[field_name] = "JSONB" # Simple type mapping - elif field_type in (str, type(None)) or (get_origin(field_type) is Union and type(None) in get_args(field_type)): - fields[field_name] = 'TEXT' + elif field_type in (str, type(None)) or ( + get_origin(field_type) is Union and type(None) in get_args(field_type) + ): + fields[field_name] = "TEXT" elif field_type == int: - fields[field_name] = 'INTEGER' + fields[field_name] = "INTEGER" elif field_type == float: - fields[field_name] = 'DOUBLE PRECISION' + fields[field_name] = "DOUBLE PRECISION" elif field_type == bool: - fields[field_name] = 'BOOLEAN' + fields[field_name] = "BOOLEAN" else: - fields[field_name] = 'TEXT' # Default to TEXT - + fields[field_name] = "TEXT" # Default to TEXT + return fields + # No caching needed with proper database + class DatabaseConnector: """ A connector for PostgreSQL-based data storage. Provides generic database operations without user/mandate filtering. Uses PostgreSQL with JSONB columns for flexible data storage. """ - def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, dbPort: int = None, userId: str = None): + + def __init__( + self, + dbHost: str, + dbDatabase: str, + dbUser: str = None, + dbPassword: str = None, + dbPort: int = None, + userId: str = None, + ): # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword self.dbPort = dbPort - + # Set userId (default to empty string if None) self.userId = userId if userId is not None else "" - + # Initialize database system first (creates database if needed) self.connection = None self.initDbSystem() - + # No caching needed with proper database - PostgreSQL handles performance - + # Thread safety self._lock = threading.Lock() - + # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() - - + def initDbSystem(self): """Initialize the database system - creates database and tables.""" try: # Create database if it doesn't exist self._create_database_if_not_exists() - + # Create tables self._create_tables() - + # Establish connection to the database self._connect() - + logger.info("PostgreSQL database system initialized successfully") except Exception as e: logger.error(f"FATAL ERROR: Database system initialization failed: {e}") raise - + def _create_database_if_not_exists(self): """Create the database if it doesn't exist.""" try: @@ -123,29 +162,32 @@ class DatabaseConnector: database="postgres", user=self.dbUser, password=self.dbPassword, - client_encoding='utf8' + client_encoding="utf8", ) conn.autocommit = True - + with conn.cursor() as cursor: # Check if database exists - cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", (self.dbDatabase,)) + cursor.execute( + "SELECT 1 FROM pg_database WHERE datname = %s", (self.dbDatabase,) + ) exists = cursor.fetchone() - + if not exists: # Create database with proper quoting for names with hyphens quoted_db_name = f'"{self.dbDatabase}"' cursor.execute(f"CREATE DATABASE {quoted_db_name}") logger.info(f"Created database: {self.dbDatabase}") - + conn.close() - + except Exception as e: logger.error(f"FATAL ERROR: Cannot create database: {e}") logger.error("Database connection failed - application cannot start") - raise RuntimeError(f"FATAL ERROR: Cannot create database '{self.dbDatabase}': {e}") - - + raise RuntimeError( + f"FATAL ERROR: Cannot create database '{self.dbDatabase}': {e}" + ) + def _create_tables(self): """Create only the system table - application tables are created by interfaces.""" try: @@ -156,10 +198,10 @@ class DatabaseConnector: database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, - client_encoding='utf8' + client_encoding="utf8", ) conn.autocommit = True - + with conn.cursor() as cursor: # Create only the system table cursor.execute(""" @@ -172,12 +214,14 @@ class DatabaseConnector: ) """) conn.close() - + except Exception as e: logger.error(f"FATAL ERROR: Cannot create system table: {e}") - logger.error("Database system table creation failed - application cannot start") + logger.error( + "Database system table creation failed - application cannot start" + ) raise RuntimeError(f"FATAL ERROR: Cannot create system table: {e}") - + def _connect(self): """Establish connection to PostgreSQL database.""" try: @@ -188,14 +232,14 @@ class DatabaseConnector: database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, - client_encoding='utf8', - cursor_factory=psycopg2.extras.RealDictCursor + client_encoding="utf8", + cursor_factory=psycopg2.extras.RealDictCursor, ) self.connection.autocommit = False # Use transactions except Exception as e: logger.error(f"Failed to connect to PostgreSQL: {e}") raise - + def _ensure_connection(self): """Ensure database connection is alive, reconnect if necessary.""" try: @@ -208,72 +252,78 @@ class DatabaseConnector: except Exception as e: logger.warning(f"Connection lost, reconnecting: {e}") self._connect() - + def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" try: # First ensure the system table exists self._ensureTableExists(SystemTable) - + with self.connection.cursor() as cursor: # Check if system table has any data cursor.execute('SELECT COUNT(*) FROM "_system"') row = cursor.fetchone() - count = row['count'] if row else 0 - + count = row["count"] if row else 0 + self.connection.commit() except Exception as e: logger.error(f"Error initializing system table: {e}") self.connection.rollback() raise - + def _loadSystemTable(self) -> Dict[str, str]: """Loads the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: cursor.execute('SELECT "table_name", "initial_id" FROM "_system"') rows = cursor.fetchall() - + system_data = {} for row in rows: - system_data[row['table_name']] = row['initial_id'] - + system_data[row["table_name"]] = row["initial_id"] + return system_data except Exception as e: logger.error(f"Error loading system table: {e}") return {} - + def _saveSystemTable(self, data: Dict[str, str]) -> bool: """Saves the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: # Clear existing data cursor.execute('DELETE FROM "_system"') - + # Insert new data for table_name, initial_id in data.items(): - cursor.execute(""" + cursor.execute( + """ INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt") VALUES (%s, %s, %s) - """, (table_name, initial_id, get_utc_timestamp())) - + """, + (table_name, initial_id, get_utc_timestamp()), + ) + self.connection.commit() return True except Exception as e: logger.error(f"Error saving system table: {e}") self.connection.rollback() return False - + def _ensureSystemTableExists(self) -> bool: """Ensures the system table exists, creates it if it doesn't.""" try: self._ensure_connection() - + with self.connection.cursor() as cursor: # Check if system table exists - cursor.execute("SELECT COUNT(*) FROM pg_stat_user_tables WHERE relname = %s", (self._systemTableName,)) - exists = cursor.fetchone()['count'] > 0 - + cursor.execute( + "SELECT COUNT(*) FROM pg_stat_user_tables WHERE relname = %s", + (self._systemTableName,), + ) + exists = cursor.fetchone()["count"] > 0 + if not exists: # Create system table cursor.execute(f""" @@ -287,119 +337,142 @@ class DatabaseConnector: logger.info("System table created successfully") else: # Check if we need to add missing columns to existing table - cursor.execute(""" + cursor.execute( + """ SELECT column_name FROM information_schema.columns WHERE table_name = %s AND table_schema = 'public' - """, (self._systemTableName,)) - existing_columns = [row['column_name'] for row in cursor.fetchall()] - - if '_modifiedAt' not in existing_columns: - cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION') - + """, + (self._systemTableName,), + ) + existing_columns = [row["column_name"] for row in cursor.fetchall()] + + if "_modifiedAt" not in existing_columns: + cursor.execute( + f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION' + ) + return True except Exception as e: logger.error(f"Error ensuring system table exists: {e}") return False - + def _ensureTableExists(self, model_class: type) -> bool: """Ensures a table exists, creates it if it doesn't.""" table = model_class.__name__ - + if table == "SystemTable": # Handle system table specially - it uses _system as the actual table name return self._ensureSystemTableExists() - + try: self._ensure_connection() - + with self.connection.cursor() as cursor: # Check if table exists by querying information_schema with case-insensitive search - cursor.execute(''' + cursor.execute( + """ SELECT COUNT(*) FROM information_schema.tables WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public' - ''', (table,)) - exists = cursor.fetchone()['count'] > 0 - + """, + (table,), + ) + exists = cursor.fetchone()["count"] > 0 + if not exists: # Create table from Pydantic model self._create_table_from_model(cursor, table, model_class) - logger.info(f"Created table '{table}' with columns from Pydantic model") - + logger.info( + f"Created table '{table}' with columns from Pydantic model" + ) + self.connection.commit() return True except Exception as e: logger.error(f"Error ensuring table {table} exists: {e}") - if hasattr(self, 'connection') and self.connection: + if hasattr(self, "connection") and self.connection: self.connection.rollback() return False - def _create_table_from_model(self, cursor, table: str, model_class: type) -> None: """Create table with columns matching Pydantic model fields.""" fields = _get_model_fields(model_class) - + # Build column definitions with quoted identifiers to preserve exact case columns = ['"id" VARCHAR(255) PRIMARY KEY'] for field_name, sql_type in fields.items(): - if field_name != 'id': # Skip id, already defined + if field_name != "id": # Skip id, already defined columns.append(f'"{field_name}" {sql_type}') - + # Add metadata columns - columns.extend([ - '"_createdAt" DOUBLE PRECISION', - '"_modifiedAt" DOUBLE PRECISION', - '"_createdBy" VARCHAR(255)', - '"_modifiedBy" VARCHAR(255)' - ]) - + columns.extend( + [ + '"_createdAt" DOUBLE PRECISION', + '"_modifiedAt" DOUBLE PRECISION', + '"_createdBy" VARCHAR(255)', + '"_modifiedBy" VARCHAR(255)', + ] + ) + # Create table sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})' cursor.execute(sql) - + # Create indexes for foreign keys for field_name in fields: - if field_name.endswith('Id') and field_name != 'id': - cursor.execute(f'CREATE INDEX IF NOT EXISTS "idx_{table}_{field_name}" ON "{table}" ("{field_name}")') + if field_name.endswith("Id") and field_name != "id": + cursor.execute( + f'CREATE INDEX IF NOT EXISTS "idx_{table}_{field_name}" ON "{table}" ("{field_name}")' + ) - - def _save_record(self, cursor, table: str, recordId: str, record: Dict[str, Any], model_class: type) -> None: + def _save_record( + self, + cursor, + table: str, + recordId: str, + record: Dict[str, Any], + model_class: type, + ) -> None: """Save record to normalized table with explicit columns.""" # Get columns from Pydantic model instead of database schema fields = _get_model_fields(model_class) - columns = ['id'] + [field for field in fields.keys() if field != 'id'] + ['_createdAt', '_createdBy', '_modifiedAt', '_modifiedBy'] - - + columns = ( + ["id"] + + [field for field in fields.keys() if field != "id"] + + ["_createdAt", "_createdBy", "_modifiedAt", "_modifiedBy"] + ) + if not columns: logger.error(f"No columns found for table {table}") return - + # Filter record data to only include columns that exist in the table filtered_record = {k: v for k, v in record.items() if k in columns} - + # Ensure id is set - filtered_record['id'] = recordId - + filtered_record["id"] = recordId + # Prepare values in the correct order values = [] for col in columns: value = filtered_record.get(col) - + # Handle timestamp fields - store as Unix timestamps (floats) for consistency - if col in ['_createdAt', '_modifiedAt'] and value is not None: + if col in ["_createdAt", "_modifiedAt"] and value is not None: if isinstance(value, str): # Try to parse string as timestamp try: value = float(value) except: pass # Keep as string if parsing fails - + # Convert enum values to their string representation - elif hasattr(value, 'value'): + elif hasattr(value, "value"): value = value.value - + # Handle JSONB fields - ensure proper JSON format for PostgreSQL - elif col in fields and fields[col] == 'JSONB' and value is not None: + elif col in fields and fields[col] == "JSONB" and value is not None: import json + if isinstance(value, (dict, list)): # Convert Python objects to JSON string for PostgreSQL JSONB value = json.dumps(value) @@ -416,42 +489,51 @@ class DatabaseConnector: else: # Convert other types to JSON value = json.dumps(value) - + values.append(value) - - + # Build INSERT/UPDATE with quoted identifiers - col_names = ', '.join([f'"{col}"' for col in columns]) - placeholders = ', '.join(['%s'] * len(columns)) - updates = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ['_createdAt', '_createdBy']]) - + col_names = ", ".join([f'"{col}"' for col in columns]) + placeholders = ", ".join(["%s"] * len(columns)) + updates = ", ".join( + [ + f'"{col}" = EXCLUDED."{col}"' + for col in columns[1:] + if col not in ["_createdAt", "_createdBy"] + ] + ) + sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}' - + cursor.execute(sql, values) - + def _loadRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the normalized table.""" table = model_class.__name__ - + try: if not self._ensureTableExists(model_class): return None - + with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" WHERE "id" = %s', (recordId,)) row = cursor.fetchone() if not row: return None - + # Convert row to dict and handle JSONB fields record = dict(row) fields = _get_model_fields(model_class) - - + # Parse JSONB fields back to Python objects for field_name, field_type in fields.items(): - if field_type == 'JSONB' and field_name in record and record[field_name] is not None: + if ( + field_type == "JSONB" + and field_name in record + and record[field_name] is not None + ): import json + try: if isinstance(record[field_name], str): # Parse JSON string back to Python object @@ -464,26 +546,30 @@ class DatabaseConnector: record[field_name] = json.loads(str(record[field_name])) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string - logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") + logger.warning( + f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" + ) pass - + return record except Exception as e: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None - - def _saveRecord(self, model_class: type, recordId: str, record: Dict[str, Any]) -> bool: + + def _saveRecord( + self, model_class: type, recordId: str, record: Dict[str, Any] + ) -> bool: """Saves a single record to the table.""" table = model_class.__name__ - + try: if not self._ensureTableExists(model_class): return False - + recordId = str(recordId) if "id" in record and str(record["id"]) != recordId: raise ValueError(f"Record ID mismatch: {recordId} != {record['id']}") - + # Add metadata currentTime = get_utc_timestamp() if "_createdAt" not in record: @@ -491,74 +577,85 @@ class DatabaseConnector: record["_createdBy"] = self.userId record["_modifiedAt"] = currentTime record["_modifiedBy"] = self.userId - + with self.connection.cursor() as cursor: self._save_record(cursor, table, recordId, record, model_class) - + self.connection.commit() return True except Exception as e: logger.error(f"Error saving record {recordId} to table {table}: {e}") self.connection.rollback() return False - + def _loadTable(self, model_class: type) -> List[Dict[str, Any]]: """Loads all records from a normalized table.""" table = model_class.__name__ - + if table == self._systemTableName: return self._loadSystemTable() - + try: if not self._ensureTableExists(model_class): return [] - + with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" ORDER BY "id"') records = [dict(row) for row in cursor.fetchall()] - + # Handle JSONB fields for all records fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): - if field_type == 'JSONB' and field_name in record: + if field_type == "JSONB" and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name - if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: + if field_name in [ + "logs", + "messages", + "tasks", + "expectedDocumentFormats", + "resultDocuments", + ]: record[field_name] = [] - elif field_name in ['execParameters', 'stats']: + elif field_name in ["execParameters", "stats"]: record[field_name] = {} else: record[field_name] = None else: import json + try: if isinstance(record[field_name], str): # Parse JSON string back to Python object - record[field_name] = json.loads(record[field_name]) + record[field_name] = json.loads( + record[field_name] + ) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON - record[field_name] = json.loads(str(record[field_name])) + record[field_name] = json.loads( + str(record[field_name]) + ) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string - logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") + logger.warning( + f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" + ) pass - + return records except Exception as e: logger.error(f"Error loading table {table}: {e}") return [] - - - + def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() - + if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) @@ -568,58 +665,64 @@ class DatabaseConnector: else: # Check if the existing initial ID still exists in the table existingInitialId = systemData[table] - records = self.getRecordset(model_class, recordFilter={"id": existingInitialId}) + records = self.getRecordset( + model_class, recordFilter={"id": existingInitialId} + ) if not records: # The initial record no longer exists, update to the new one systemData[table] = initialId success = self._saveSystemTable(systemData) if success: - logger.info(f"Initial ID updated from {existingInitialId} to {initialId} for table {table}") + logger.info( + f"Initial ID updated from {existingInitialId} to {initialId} for table {table}" + ) return success else: return True except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False - + def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() - + if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: - logger.info(f"Initial ID for table {table} removed from system table") + logger.info( + f"Initial ID for table {table} removed from system table" + ) return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False - + def updateContext(self, userId: str) -> None: """Updates the context of the database connector.""" if userId is None: raise ValueError("userId must be provided") - + self.userId = userId # No cache to clear - database handles data consistency - + # Public API - + def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] - + try: # Ensure connection is alive self._ensure_connection() - + if not self.connection or self.connection.closed: logger.error("Database connection is not available") return tables - + with self.connection.cursor() as cursor: cursor.execute(""" SELECT table_name @@ -628,104 +731,121 @@ class DatabaseConnector: ORDER BY table_name """) rows = cursor.fetchall() - tables = [row['table_name'] for row in rows] + tables = [row["table_name"] for row in rows] except Exception as e: logger.error(f"Error reading the database {self.dbDatabase}: {e}") - + return tables - + def getFields(self, model_class: type) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(model_class) - + if not data: return [] - + fields = list(data[0].keys()) if data else [] - + return fields - - def getSchema(self, model_class: type, language: str = None) -> Dict[str, Dict[str, Any]]: + + def getSchema( + self, model_class: type, language: str = None + ) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(model_class) - + schema = {} - + if not data: return schema - + firstRecord = data[0] - + for field, value in firstRecord.items(): dataType = type(value).__name__ label = field - - schema[field] = { - "type": dataType, - "label": label - } - + + schema[field] = {"type": dataType, "label": label} + return schema - - def getRecordset(self, model_class: type, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: + + def getRecordset( + self, + model_class: type, + fieldFilter: List[str] = None, + recordFilter: Dict[str, Any] = None, + ) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" table = model_class.__name__ - + try: if not self._ensureTableExists(model_class): return [] - + # Build WHERE clause from recordFilter where_conditions = [] where_values = [] - + if recordFilter: for field, value in recordFilter.items(): where_conditions.append(f'"{field}" = %s') where_values.append(value) - + # Build the query if where_conditions: where_clause = " WHERE " + " AND ".join(where_conditions) else: where_clause = "" - + query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"' - + with self.connection.cursor() as cursor: cursor.execute(query, where_values) records = [dict(row) for row in cursor.fetchall()] - + # Handle JSONB fields for all records fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): - if field_type == 'JSONB' and field_name in record: + if field_type == "JSONB" and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name - if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: + if field_name in [ + "logs", + "messages", + "tasks", + "expectedDocumentFormats", + "resultDocuments", + ]: record[field_name] = [] - elif field_name in ['execParameters', 'stats']: + elif field_name in ["execParameters", "stats"]: record[field_name] = {} else: record[field_name] = None else: import json + try: if isinstance(record[field_name], str): # Parse JSON string back to Python object - record[field_name] = json.loads(record[field_name]) + record[field_name] = json.loads( + record[field_name] + ) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON - record[field_name] = json.loads(str(record[field_name])) + record[field_name] = json.loads( + str(record[field_name]) + ) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string - logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") + logger.warning( + f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" + ) pass - + # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] @@ -736,13 +856,15 @@ class DatabaseConnector: filteredRecord[field] = record[field] result.append(filteredRecord) return result - + return records except Exception as e: logger.error(f"Error loading records from table {table}: {e}") return [] - - def recordCreate(self, model_class: type, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]: + + def recordCreate( + self, model_class: type, record: Union[Dict[str, Any], BaseModel] + ) -> Dict[str, Any]: """Creates a new record in a table based on Pydantic model class.""" # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): @@ -751,14 +873,14 @@ class DatabaseConnector: record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") - + # Ensure record has an ID if "id" not in record: record["id"] = str(uuid.uuid4()) - + # Save record self._saveRecord(model_class, record["id"], record) - + # Check if this is the first record in the table and register as initial ID table = model_class.__name__ existingInitialId = self.getInitialId(model_class) @@ -766,17 +888,19 @@ class DatabaseConnector: # This is the first record, register it as the initial ID self._registerInitialId(table, record["id"]) logger.info(f"Registered initial ID {record['id']} for table {table}") - + return record - - def recordModify(self, model_class: type, recordId: str, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]: + + def recordModify( + self, model_class: type, recordId: str, record: Union[Dict[str, Any], BaseModel] + ) -> Dict[str, Any]: """Modifies an existing record in a table based on Pydantic model class.""" # Load existing record existingRecord = self._loadRecord(model_class, recordId) if not existingRecord: table = model_class.__name__ raise ValueError(f"Record {recordId} not found in table {table}") - + # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = to_dict(record) @@ -784,15 +908,19 @@ class DatabaseConnector: record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") - + # CRITICAL: Ensure we never modify the ID if "id" in record and str(record["id"]) != recordId: - logger.error(f"Attempted to modify record ID from {recordId} to {record['id']}") - raise ValueError("Cannot modify record ID - it must match the provided recordId") - + logger.error( + f"Attempted to modify record ID from {recordId} to {record['id']}" + ) + raise ValueError( + "Cannot modify record ID - it must match the provided recordId" + ) + # Update existing record with new data existingRecord.update(record) - + # Save updated record self._saveRecord(model_class, recordId, existingRecord) return existingRecord @@ -800,49 +928,56 @@ class DatabaseConnector: def recordDelete(self, model_class: type, recordId: str) -> bool: """Deletes a record from the table based on Pydantic model class.""" table = model_class.__name__ - + try: if not self._ensureTableExists(model_class): return False - + with self.connection.cursor() as cursor: # Check if record exists - cursor.execute(f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,)) + cursor.execute( + f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,) + ) if not cursor.fetchone(): return False - + # Check if it's an initial record initialId = self.getInitialId(model_class) if initialId is not None and initialId == recordId: self._removeInitialId(table) - logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") - + logger.info( + f"Initial ID {recordId} for table {table} has been removed from the system table" + ) + # Delete the record cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,)) - + # No cache to update - database handles consistency - + self.connection.commit() return True - + except Exception as e: logger.error(f"Error deleting record {recordId} from table {table}: {e}") self.connection.rollback() return False - def getInitialId(self, model_class: type) -> Optional[str]: """Returns the initial ID for a table.""" table = model_class.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) return initialId - + def close(self): """Close the database connection.""" - if hasattr(self, 'connection') and self.connection and not self.connection.closed: + if ( + hasattr(self, "connection") + and self.connection + and not self.connection.closed + ): self.connection.close() - + def __del__(self): """Cleanup method to close connection.""" try: