import psycopg2 import psycopg2.extras import logging from typing import List, Dict, Any, Optional, Union, get_origin, get_args import uuid from pydantic import BaseModel, Field import threading from modules.shared.timezoneUtils import getUtcTimestamp from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # No mapping needed - table name = Pydantic model name exactly class SystemTable(BaseModel): """Data model for system table entries""" table_name: str = Field( description="Name of the table", frontend_type="text", frontend_readonly=True, frontend_required=True, ) initial_id: Optional[str] = Field( default=None, description="Initial ID for the table", frontend_type="text", frontend_readonly=True, frontend_required=False, ) def _get_model_fields(model_class) -> Dict[str, str]: """Get all fields from Pydantic model and map to SQL types.""" # Pydantic v2 model_fields = model_class.model_fields fields = {} for field_name, field_info in model_fields.items(): # Pydantic v2 field_type = field_info.annotation # Check for JSONB fields (Dict, List, or complex types) if ( field_type == dict or field_type == list or ( hasattr(field_type, "__origin__") and field_type.__origin__ in (dict, list) ) or field_name in [ "execParameters", "expectedDocumentFormats", "resultDocuments", "logs", "messages", "stats", "tasks", ] ): fields[field_name] = "JSONB" # Simple type mapping elif field_type in (str, type(None)) or ( get_origin(field_type) is Union and type(None) in get_args(field_type) ): fields[field_name] = "TEXT" elif field_type == int: fields[field_name] = "INTEGER" elif field_type == float: fields[field_name] = "DOUBLE PRECISION" elif field_type == bool: fields[field_name] = "BOOLEAN" else: fields[field_name] = "TEXT" # Default to TEXT return fields # No caching needed with proper database class DatabaseConnector: """ A connector for PostgreSQL-based data storage. Provides generic database operations without user/mandate filtering. Uses PostgreSQL with JSONB columns for flexible data storage. """ def __init__( self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None, dbPort: int = None, userId: str = None, ): # Store the input parameters self.dbHost = dbHost self.dbDatabase = dbDatabase self.dbUser = dbUser self.dbPassword = dbPassword self.dbPort = dbPort # Set userId (default to empty string if None) self.userId = userId if userId is not None else "" # Initialize database system first (creates database if needed) self.connection = None self.initDbSystem() # No caching needed with proper database - PostgreSQL handles performance # Thread safety self._lock = threading.Lock() # Initialize system table self._systemTableName = "_system" self._initializeSystemTable() def initDbSystem(self): """Initialize the database system - creates database and tables.""" try: # Create database if it doesn't exist self._create_database_if_not_exists() # Create tables self._create_tables() # Establish connection to the database self._connect() logger.info("PostgreSQL database system initialized successfully") except Exception as e: logger.error(f"FATAL ERROR: Database system initialization failed: {e}") raise def _create_database_if_not_exists(self): """Create the database if it doesn't exist.""" try: # Use the configured user for database creation conn = psycopg2.connect( host=self.dbHost, port=self.dbPort, database="postgres", user=self.dbUser, password=self.dbPassword, client_encoding="utf8", ) conn.autocommit = True with conn.cursor() as cursor: # Check if database exists cursor.execute( "SELECT 1 FROM pg_database WHERE datname = %s", (self.dbDatabase,) ) exists = cursor.fetchone() if not exists: # Create database with proper quoting for names with hyphens quoted_db_name = f'"{self.dbDatabase}"' cursor.execute(f"CREATE DATABASE {quoted_db_name}") logger.info(f"Created database: {self.dbDatabase}") conn.close() except Exception as e: logger.error(f"FATAL ERROR: Cannot create database: {e}") logger.error("Database connection failed - application cannot start") raise RuntimeError( f"FATAL ERROR: Cannot create database '{self.dbDatabase}': {e}" ) def _create_tables(self): """Create only the system table - application tables are created by interfaces.""" try: # Use the configured user for table creation conn = psycopg2.connect( host=self.dbHost, port=self.dbPort, database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, client_encoding="utf8", ) conn.autocommit = True with conn.cursor() as cursor: # Create only the system table cursor.execute(""" CREATE TABLE IF NOT EXISTS _system ( id SERIAL PRIMARY KEY, table_name VARCHAR(255) UNIQUE NOT NULL, initial_id VARCHAR(255) NOT NULL, _createdAt DOUBLE PRECISION, _modifiedAt DOUBLE PRECISION ) """) conn.close() except Exception as e: logger.error(f"FATAL ERROR: Cannot create system table: {e}") logger.error( "Database system table creation failed - application cannot start" ) raise RuntimeError(f"FATAL ERROR: Cannot create system table: {e}") def _connect(self): """Establish connection to PostgreSQL database.""" try: # Use configured user for main connection with proper parameter handling self.connection = psycopg2.connect( host=self.dbHost, port=self.dbPort, database=self.dbDatabase, user=self.dbUser, password=self.dbPassword, client_encoding="utf8", cursor_factory=psycopg2.extras.RealDictCursor, ) self.connection.autocommit = False # Use transactions except Exception as e: logger.error(f"Failed to connect to PostgreSQL: {e}") raise def _ensure_connection(self): """Ensure database connection is alive, reconnect if necessary.""" try: if self.connection is None or self.connection.closed: self._connect() else: # Test connection with a simple query with self.connection.cursor() as cursor: cursor.execute("SELECT 1") except Exception as e: logger.warning(f"Connection lost, reconnecting: {e}") self._connect() def _initializeSystemTable(self): """Initializes the system table if it doesn't exist yet.""" try: # First ensure the system table exists self._ensureTableExists(SystemTable) with self.connection.cursor() as cursor: # Check if system table has any data cursor.execute('SELECT COUNT(*) FROM "_system"') row = cursor.fetchone() count = row["count"] if row else 0 self.connection.commit() except Exception as e: logger.error(f"Error initializing system table: {e}") self.connection.rollback() raise def _loadSystemTable(self) -> Dict[str, str]: """Loads the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: cursor.execute('SELECT "table_name", "initial_id" FROM "_system"') rows = cursor.fetchall() system_data = {} for row in rows: system_data[row["table_name"]] = row["initial_id"] return system_data except Exception as e: logger.error(f"Error loading system table: {e}") return {} def _saveSystemTable(self, data: Dict[str, str]) -> bool: """Saves the system table with the initial IDs.""" try: with self.connection.cursor() as cursor: # Clear existing data cursor.execute('DELETE FROM "_system"') # Insert new data for table_name, initial_id in data.items(): cursor.execute( """ INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt") VALUES (%s, %s, %s) """, (table_name, initial_id, getUtcTimestamp()), ) self.connection.commit() return True except Exception as e: logger.error(f"Error saving system table: {e}") self.connection.rollback() return False def _ensureSystemTableExists(self) -> bool: """Ensures the system table exists, creates it if it doesn't.""" try: self._ensure_connection() with self.connection.cursor() as cursor: # Check if system table exists cursor.execute( "SELECT COUNT(*) FROM pg_stat_user_tables WHERE relname = %s", (self._systemTableName,), ) exists = cursor.fetchone()["count"] > 0 if not exists: # Create system table cursor.execute(f""" CREATE TABLE "{self._systemTableName}" ( "table_name" VARCHAR(255) PRIMARY KEY, "initial_id" VARCHAR(255), "_createdAt" DOUBLE PRECISION, "_modifiedAt" DOUBLE PRECISION ) """) logger.info("System table created successfully") else: # Check if we need to add missing columns to existing table cursor.execute( """ SELECT column_name FROM information_schema.columns WHERE table_name = %s AND table_schema = 'public' """, (self._systemTableName,), ) existing_columns = [row["column_name"] for row in cursor.fetchall()] if "_modifiedAt" not in existing_columns: cursor.execute( f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION' ) return True except Exception as e: logger.error(f"Error ensuring system table exists: {e}") return False def _ensureTableExists(self, model_class: type) -> bool: """Ensures a table exists, creates it if it doesn't.""" table = model_class.__name__ if table == "SystemTable": # Handle system table specially - it uses _system as the actual table name return self._ensureSystemTableExists() try: self._ensure_connection() with self.connection.cursor() as cursor: # Check if table exists by querying information_schema with case-insensitive search cursor.execute( """ SELECT COUNT(*) FROM information_schema.tables WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public' """, (table,), ) exists = cursor.fetchone()["count"] > 0 if not exists: # Create table from Pydantic model self._create_table_from_model(cursor, table, model_class) logger.info( f"Created table '{table}' with columns from Pydantic model" ) else: # Table exists: ensure all columns from model are present (simple additive migration) try: cursor.execute( """ SELECT column_name FROM information_schema.columns WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public' """, (table,), ) existing_columns = { row["column_name"] for row in cursor.fetchall() } # Desired columns based on model model_fields = _get_model_fields(model_class) desired_columns = ( set(["id"]) | set(model_fields.keys()) | {"_createdAt", "_modifiedAt", "_createdBy", "_modifiedBy"} ) # Add missing columns for col in sorted(desired_columns - existing_columns): # Determine SQL type if col in ["id"]: continue # primary key exists already sql_type = model_fields.get(col) if col in ["_createdAt"]: sql_type = "DOUBLE PRECISION" elif col in ["_modifiedAt"]: sql_type = "DOUBLE PRECISION" elif col in ["_createdBy", "_modifiedBy"]: sql_type = "VARCHAR(255)" if not sql_type: sql_type = "TEXT" try: cursor.execute( f'ALTER TABLE "{table}" ADD COLUMN "{col}" {sql_type}' ) logger.info( f"Added missing column '{col}' ({sql_type}) to '{table}'" ) except Exception as add_err: logger.warning( f"Could not add column '{col}' to '{table}': {add_err}" ) except Exception as ensure_err: logger.warning( f"Could not ensure columns for existing table '{table}': {ensure_err}" ) self.connection.commit() return True except Exception as e: logger.error(f"Error ensuring table {table} exists: {e}") if hasattr(self, "connection") and self.connection: self.connection.rollback() return False def _create_table_from_model(self, cursor, table: str, model_class: type) -> None: """Create table with columns matching Pydantic model fields.""" fields = _get_model_fields(model_class) # Build column definitions with quoted identifiers to preserve exact case columns = ['"id" VARCHAR(255) PRIMARY KEY'] for field_name, sql_type in fields.items(): if field_name != "id": # Skip id, already defined columns.append(f'"{field_name}" {sql_type}') # Add metadata columns columns.extend( [ '"_createdAt" DOUBLE PRECISION', '"_modifiedAt" DOUBLE PRECISION', '"_createdBy" VARCHAR(255)', '"_modifiedBy" VARCHAR(255)', ] ) # Create table sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})' cursor.execute(sql) # Create indexes for foreign keys for field_name in fields: if field_name.endswith("Id") and field_name != "id": cursor.execute( f'CREATE INDEX IF NOT EXISTS "idx_{table}_{field_name}" ON "{table}" ("{field_name}")' ) def _save_record( self, cursor, table: str, recordId: str, record: Dict[str, Any], model_class: type, ) -> None: """Save record to normalized table with explicit columns.""" # Get columns from Pydantic model instead of database schema fields = _get_model_fields(model_class) columns = ( ["id"] + [field for field in fields.keys() if field != "id"] + ["_createdAt", "_createdBy", "_modifiedAt", "_modifiedBy"] ) if not columns: logger.error(f"No columns found for table {table}") return # Filter record data to only include columns that exist in the table filtered_record = {k: v for k, v in record.items() if k in columns} # Ensure id is set filtered_record["id"] = recordId # Prepare values in the correct order values = [] for col in columns: value = filtered_record.get(col) # Handle timestamp fields - store as Unix timestamps (floats) for consistency if col in ["_createdAt", "_modifiedAt"] and value is not None: if isinstance(value, str): # Try to parse string as timestamp try: value = float(value) except: pass # Keep as string if parsing fails # Convert enum values to their string representation elif hasattr(value, "value"): value = value.value # Handle JSONB fields - ensure proper JSON format for PostgreSQL elif col in fields and fields[col] == "JSONB" and value is not None: import json if isinstance(value, (dict, list)): # Convert Python objects to JSON string for PostgreSQL JSONB value = json.dumps(value) elif isinstance(value, str): # Validate that it's valid JSON, if not, try to parse and re-serialize try: # Test if it's already valid JSON json.loads(value) # If successful, keep as is pass except (json.JSONDecodeError, TypeError): # If not valid JSON, convert to JSON string value = json.dumps(value) elif hasattr(value, 'model_dump'): # Handle Pydantic models value = json.dumps(value.model_dump()) else: # Convert other types to JSON value = json.dumps(value) values.append(value) # Build INSERT/UPDATE with quoted identifiers col_names = ", ".join([f'"{col}"' for col in columns]) placeholders = ", ".join(["%s"] * len(columns)) updates = ", ".join( [ f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ["_createdAt", "_createdBy"] ] ) sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}' cursor.execute(sql, values) def _loadRecord(self, model_class: type, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the normalized table.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return None with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" WHERE "id" = %s', (recordId,)) row = cursor.fetchone() if not row: return None # Convert row to dict and handle JSONB fields record = dict(row) fields = _get_model_fields(model_class) # Ensure numeric fields are properly typed and parse JSONB fields for field_name, field_type in fields.items(): # Ensure numeric fields (float/int) are properly typed # psycopg2 may return them as strings in some environments (e.g., Azure PostgreSQL) if field_type in ("DOUBLE PRECISION", "INTEGER") and field_name in record: value = record[field_name] if value is not None: try: if field_type == "DOUBLE PRECISION": record[field_name] = float(value) elif field_type == "INTEGER": record[field_name] = int(value) except (ValueError, TypeError): # If conversion fails, log warning but keep original value logger.warning( f"Could not convert {field_name} to {field_type} for record {recordId}: {value}" ) elif ( field_type == "JSONB" and field_name in record and record[field_name] is not None ): import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads(record[field_name]) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads(str(record[field_name])) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning( f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" ) pass return record except Exception as e: logger.error(f"Error loading record {recordId} from table {table}: {e}") return None def _saveRecord( self, model_class: type, recordId: str, record: Dict[str, Any] ) -> bool: """Saves a single record to the table.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return False recordId = str(recordId) if "id" in record and str(record["id"]) != recordId: raise ValueError(f"Record ID mismatch: {recordId} != {record['id']}") # Add metadata currentTime = getUtcTimestamp() # Set _createdAt and _createdBy if this is a new record (record doesn't have _createdAt) if "_createdAt" not in record: record["_createdAt"] = currentTime # Only set _createdBy if userId is valid (not None or empty string) if self.userId: record["_createdBy"] = self.userId else: logger.warning(f"Attempting to create record with empty userId - _createdBy will not be set") # Also ensure _createdBy is set even if _createdAt exists but _createdBy is missing/empty elif "_createdBy" not in record or not record.get("_createdBy"): if self.userId: record["_createdBy"] = self.userId else: logger.warning(f"Attempting to set _createdBy with empty userId for record {recordId}") # Always update modification metadata record["_modifiedAt"] = currentTime if self.userId: record["_modifiedBy"] = self.userId with self.connection.cursor() as cursor: self._save_record(cursor, table, recordId, record, model_class) self.connection.commit() return True except Exception as e: logger.error(f"Error saving record {recordId} to table {table}: {e}") self.connection.rollback() return False def _loadTable(self, model_class: type) -> List[Dict[str, Any]]: """Loads all records from a normalized table.""" table = model_class.__name__ if table == self._systemTableName: return self._loadSystemTable() try: if not self._ensureTableExists(model_class): return [] with self.connection.cursor() as cursor: cursor.execute(f'SELECT * FROM "{table}" ORDER BY "id"') records = [dict(row) for row in cursor.fetchall()] # Handle JSONB fields for all records fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): if field_type == "JSONB" and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name if field_name in [ "logs", "messages", "tasks", "expectedDocumentFormats", "resultDocuments", ]: record[field_name] = [] elif field_name in ["execParameters", "stats"]: record[field_name] = {} else: record[field_name] = None else: import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads( record[field_name] ) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads( str(record[field_name]) ) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning( f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" ) pass return records except Exception as e: logger.error(f"Error loading table {table}: {e}") return [] def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" try: systemData = self._loadSystemTable() if table not in systemData: systemData[table] = initialId success = self._saveSystemTable(systemData) if success: logger.info(f"Initial ID {initialId} for table {table} registered") return success else: # Table already has an initial ID registered logger.debug(f"Table {table} already has initial ID {systemData[table]}") return True except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False def _removeInitialId(self, table: str) -> bool: """Removes the initial ID for a table from the system table.""" try: systemData = self._loadSystemTable() if table in systemData: del systemData[table] success = self._saveSystemTable(systemData) if success: logger.info( f"Initial ID for table {table} removed from system table" ) return success return True # If not present, this is not an error except Exception as e: logger.error(f"Error removing initial ID for table {table}: {e}") return False def updateContext(self, userId: str) -> None: """Updates the context of the database connector.""" if userId is None: raise ValueError("userId must be provided") self.userId = userId # No cache to clear - database handles data consistency # Public API def getTables(self) -> List[str]: """Returns a list of all available tables.""" tables = [] try: # Ensure connection is alive self._ensure_connection() if not self.connection or self.connection.closed: logger.error("Database connection is not available") return tables with self.connection.cursor() as cursor: cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """) rows = cursor.fetchall() tables = [row["table_name"] for row in rows] except Exception as e: logger.error(f"Error reading the database {self.dbDatabase}: {e}") return tables def getFields(self, model_class: type) -> List[str]: """Returns a list of all fields in a table.""" data = self._loadTable(model_class) if not data: return [] fields = list(data[0].keys()) if data else [] return fields def getSchema( self, model_class: type, language: str = None ) -> Dict[str, Dict[str, Any]]: """Returns a schema object for a table with data types and labels.""" data = self._loadTable(model_class) schema = {} if not data: return schema firstRecord = data[0] for field, value in firstRecord.items(): dataType = type(value).__name__ label = field schema[field] = {"type": dataType, "label": label} return schema def getRecordset( self, model_class: type, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None, ) -> List[Dict[str, Any]]: """Returns a list of records from a table, filtered by criteria.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return [] # Build WHERE clause from recordFilter where_conditions = [] where_values = [] if recordFilter: for field, value in recordFilter.items(): where_conditions.append(f'"{field}" = %s') where_values.append(value) # Build the query if where_conditions: where_clause = " WHERE " + " AND ".join(where_conditions) else: where_clause = "" query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"' with self.connection.cursor() as cursor: cursor.execute(query, where_values) records = [dict(row) for row in cursor.fetchall()] # Handle JSONB fields and ensure numeric types are correct fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): # Ensure numeric fields (float/int) are properly typed # psycopg2 may return them as strings in some environments (e.g., Azure PostgreSQL) if field_type in ("DOUBLE PRECISION", "INTEGER") and field_name in record: value = record[field_name] if value is not None: try: if field_type == "DOUBLE PRECISION": record[field_name] = float(value) elif field_type == "INTEGER": record[field_name] = int(value) except (ValueError, TypeError): # If conversion fails, log warning but keep original value logger.warning( f"Could not convert {field_name} to {field_type} for record {record.get('id', 'unknown')}: {value}" ) elif field_type == "JSONB" and field_name in record: if record[field_name] is None: # Convert None to appropriate default based on field name if field_name in [ "logs", "messages", "tasks", "expectedDocumentFormats", "resultDocuments", ]: record[field_name] = [] elif field_name in ["execParameters", "stats"]: record[field_name] = {} else: record[field_name] = None else: import json try: if isinstance(record[field_name], str): # Parse JSON string back to Python object record[field_name] = json.loads( record[field_name] ) elif isinstance(record[field_name], (dict, list)): # Already a Python object, keep as is pass else: # Try to parse as JSON record[field_name] = json.loads( str(record[field_name]) ) except (json.JSONDecodeError, TypeError, ValueError): # If parsing fails, keep as string logger.warning( f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}" ) pass # If fieldFilter is available, reduce the fields if fieldFilter and isinstance(fieldFilter, list): result = [] for record in records: filteredRecord = {} for field in fieldFilter: if field in record: filteredRecord[field] = record[field] result.append(filteredRecord) return result return records except Exception as e: logger.error(f"Error loading records from table {table}: {e}") return [] def recordCreate( self, model_class: type, record: Union[Dict[str, Any], BaseModel] ) -> Dict[str, Any]: """Creates a new record in a table based on Pydantic model class.""" # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = record.model_dump() elif isinstance(record, dict): record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") # Ensure record has an ID if "id" not in record: record["id"] = str(uuid.uuid4()) # Save record self._saveRecord(model_class, record["id"], record) # Check if this is the first record in the table and register as initial ID table = model_class.__name__ existingInitialId = self.getInitialId(model_class) if existingInitialId is None: # This is the first record, register it as the initial ID self._registerInitialId(table, record["id"]) logger.info(f"Registered initial ID {record['id']} for table {table}") return record def recordModify( self, model_class: type, recordId: str, record: Union[Dict[str, Any], BaseModel] ) -> Dict[str, Any]: """Modifies an existing record in a table based on Pydantic model class.""" # Load existing record existingRecord = self._loadRecord(model_class, recordId) if not existingRecord: table = model_class.__name__ raise ValueError(f"Record {recordId} not found in table {table}") # If record is a Pydantic model, convert to dict if isinstance(record, BaseModel): record = record.model_dump() elif isinstance(record, dict): record = record.copy() else: raise ValueError("Record must be a Pydantic model or dictionary") # CRITICAL: Ensure we never modify the ID if "id" in record and str(record["id"]) != recordId: logger.error( f"Attempted to modify record ID from {recordId} to {record['id']}" ) raise ValueError( "Cannot modify record ID - it must match the provided recordId" ) # Update existing record with new data existingRecord.update(record) # Save updated record self._saveRecord(model_class, recordId, existingRecord) return existingRecord def recordDelete(self, model_class: type, recordId: str) -> bool: """Deletes a record from the table based on Pydantic model class.""" table = model_class.__name__ try: if not self._ensureTableExists(model_class): return False with self.connection.cursor() as cursor: # Check if record exists cursor.execute( f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,) ) if not cursor.fetchone(): return False # Check if it's an initial record initialId = self.getInitialId(model_class) if initialId is not None and initialId == recordId: self._removeInitialId(table) logger.info( f"Initial ID {recordId} for table {table} has been removed from the system table" ) # Delete the record cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,)) # No cache to update - database handles consistency self.connection.commit() return True except Exception as e: logger.error(f"Error deleting record {recordId} from table {table}: {e}") self.connection.rollback() return False def getInitialId(self, model_class: type) -> Optional[str]: """Returns the initial ID for a table.""" table = model_class.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) return initialId def close(self): """Close the database connection.""" if ( hasattr(self, "connection") and self.connection and not self.connection.closed ): self.connection.close() def __del__(self): """Cleanup method to close connection.""" try: self.close() except Exception: # Ignore errors during cleanup pass