# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Generic GDPR data deletion engine. Automatically discovers and deletes user data across all databases and tables. Design: - Schema-based: Inspects database schemas to find user-related columns - Generic: Works with any new features/tables without code changes - Safe: Anonymizes audit logs instead of deleting them - Comprehensive: Covers all databases (App, Management, Chat, Feature-DBs) """ import logging from typing import List, Dict, Any, Set, Tuple from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) # Tables to SKIP (never delete from these) PROTECTED_TABLES = { "_system", # System metadata table "UserInDB", # User account table (deleted separately at the end) } # Tables to ANONYMIZE instead of DELETE (for compliance) ANONYMIZE_TABLES = { "AuditEvent", # Audit logs must be retained for compliance "AuthEvent", # Authentication logs must be retained for compliance } # User reference column patterns to search for USER_COLUMNS = [ "userId", "createdBy", "usedBy", "revokedBy", "_createdBy", "_modifiedBy", ] def _getTableColumns(dbConnector, tableName: str) -> List[str]: """ Get all column names for a table by inspecting the schema. Args: dbConnector: DatabaseConnector instance tableName: Name of the table Returns: List of column names """ try: query = """ SELECT column_name FROM information_schema.columns WHERE table_name = %s AND table_schema = 'public' ORDER BY ordinal_position """ cursor = dbConnector.connection.cursor() cursor.execute(query, (tableName,)) columns = [row[0] for row in cursor.fetchall()] cursor.close() return columns except Exception as e: logger.error(f"Error getting columns for table {tableName}: {e}") return [] def _getAllTables(dbConnector) -> List[str]: """ Get all table names from a database, sorted by dependency order. Child tables (with foreign keys) come before parent tables. Args: dbConnector: DatabaseConnector instance Returns: List of table names in deletion order """ try: # Get all tables query = """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name """ cursor = dbConnector.connection.cursor() cursor.execute(query) allTables = [row[0] for row in cursor.fetchall()] # Get foreign key relationships to determine dependency order fkQuery = """ SELECT tc.table_name, ccu.table_name AS foreign_table_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = 'public' """ cursor.execute(fkQuery) foreignKeys = cursor.fetchall() cursor.close() # Build dependency graph (child -> parent mapping) dependencies = {} for childTable, parentTable in foreignKeys: if childTable not in dependencies: dependencies[childTable] = [] dependencies[childTable].append(parentTable) # Sort tables by dependency (topological sort) sortedTables = [] visited = set() def visit(table): if table in visited or table not in allTables: return visited.add(table) # Visit dependencies first (parents) if table in dependencies: for parent in dependencies[table]: visit(parent) sortedTables.append(table) # Visit all tables for table in allTables: visit(table) # Reverse to get deletion order (children before parents) sortedTables.reverse() # Filter out protected tables return [t for t in sortedTables if t not in PROTECTED_TABLES] except Exception as e: logger.error(f"Error getting tables from database: {e}") # Fallback: return simple list without ordering try: query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE'" cursor = dbConnector.connection.cursor() cursor.execute(query) tables = [row[0] for row in cursor.fetchall()] cursor.close() return [t for t in tables if t not in PROTECTED_TABLES] except Exception: return [] def _getPrimaryKeyColumns(dbConnector, tableName: str) -> List[str]: """ Get primary key column(s) for a table. Args: dbConnector: DatabaseConnector instance tableName: Name of the table Returns: List of primary key column names """ try: query = """ SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary """ cursor = dbConnector.connection.cursor() cursor.execute(query, (tableName,)) pkColumns = [row[0] for row in cursor.fetchall()] cursor.close() return pkColumns except Exception as e: logger.debug(f"Could not get primary key for {tableName}: {e}") return ["id"] # Fallback to 'id' def _findUserReferencesInTable( dbConnector, tableName: str, userId: str ) -> Dict[str, List[Tuple]]: """ Find all records in a table that reference a user. Args: dbConnector: DatabaseConnector instance tableName: Name of the table userId: User ID to search for Returns: Dict mapping column names to lists of primary key tuples """ try: # Get all columns for this table columns = _getTableColumns(dbConnector, tableName) # Find user-related columns in this table userColumns = [col for col in columns if col in USER_COLUMNS] if not userColumns: return {} # Get primary key columns pkColumns = _getPrimaryKeyColumns(dbConnector, tableName) if not pkColumns: logger.warning(f"Table {tableName} has no primary key, skipping") return {} references = {} cursor = dbConnector.connection.cursor() for userColumn in userColumns: # Build SELECT for primary key columns pkSelect = ", ".join([f'"{pk}"' for pk in pkColumns]) query = f'SELECT {pkSelect} FROM "{tableName}" WHERE "{userColumn}" = %s' cursor.execute(query, (userId,)) recordKeys = cursor.fetchall() if recordKeys: references[userColumn] = recordKeys logger.debug(f"Found {len(recordKeys)} records in {tableName}.{userColumn} for user {userId}") cursor.close() return references except Exception as e: logger.error(f"Error finding user references in {tableName}: {e}") return {} def _anonymizeRecords( dbConnector, tableName: str, columnName: str, recordKeys: List[Tuple], pkColumns: List[str], anonymousValue: str = "deleted_user" ) -> int: """ Anonymize user references in records (set to 'deleted_user'). Args: dbConnector: DatabaseConnector instance tableName: Name of the table columnName: Name of the column to anonymize recordKeys: List of primary key tuples pkColumns: List of primary key column names anonymousValue: Value to set (default: "deleted_user") Returns: Number of records anonymized """ if not recordKeys: return 0 try: cursor = dbConnector.connection.cursor() count = 0 for recordKey in recordKeys: # Build WHERE clause for primary key whereClause = " AND ".join([f'"{pk}" = %s' for pk in pkColumns]) # Check if table has _modifiedAt column columns = _getTableColumns(dbConnector, tableName) hasModifiedAt = "_modifiedAt" in columns if hasModifiedAt: query = f'UPDATE "{tableName}" SET "{columnName}" = %s, "_modifiedAt" = %s WHERE {whereClause}' params = [anonymousValue, getUtcTimestamp()] else: query = f'UPDATE "{tableName}" SET "{columnName}" = %s WHERE {whereClause}' params = [anonymousValue] # Add primary key values to params if isinstance(recordKey, tuple): params.extend(recordKey) else: params.append(recordKey) cursor.execute(query, params) count += cursor.rowcount dbConnector.connection.commit() cursor.close() logger.info(f"Anonymized {count} records in {tableName}.{columnName}") return count except Exception as e: logger.error(f"Error anonymizing records in {tableName}.{columnName}: {e}") dbConnector.connection.rollback() return 0 def _deleteRecords( dbConnector, tableName: str, recordKeys: List[Tuple], pkColumns: List[str] ) -> int: """ Delete records from a table. Args: dbConnector: DatabaseConnector instance tableName: Name of the table recordKeys: List of primary key tuples pkColumns: List of primary key column names Returns: Number of records deleted """ if not recordKeys: return 0 try: cursor = dbConnector.connection.cursor() count = 0 for recordKey in recordKeys: # Build WHERE clause for primary key whereClause = " AND ".join([f'"{pk}" = %s' for pk in pkColumns]) query = f'DELETE FROM "{tableName}" WHERE {whereClause}' # Prepare params if isinstance(recordKey, tuple): params = list(recordKey) else: params = [recordKey] cursor.execute(query, params) count += cursor.rowcount dbConnector.connection.commit() cursor.close() logger.info(f"Deleted {count} records from {tableName}") return count except Exception as e: logger.error(f"Error deleting records from {tableName}: {e}") dbConnector.connection.rollback() return 0 def deleteUserDataFromDatabase( dbConnector, userId: str, databaseName: str ) -> Dict[str, Any]: """ Delete or anonymize all user data from a single database. Args: dbConnector: DatabaseConnector instance userId: User ID to delete databaseName: Name of the database (for logging) Returns: Dict with deletion statistics """ stats = { "database": databaseName, "tablesProcessed": 0, "recordsDeleted": 0, "recordsAnonymized": 0, "errors": [] } try: # Get all tables in this database tables = _getAllTables(dbConnector) logger.info(f"Processing {len(tables)} tables in {databaseName} for user {userId}") for tableName in tables: try: # Get primary key columns for this table pkColumns = _getPrimaryKeyColumns(dbConnector, tableName) if not pkColumns: logger.debug(f"Skipping {tableName} - no primary key") continue # Find user references in this table references = _findUserReferencesInTable(dbConnector, tableName, userId) if not references: continue stats["tablesProcessed"] += 1 # Decide: Anonymize or Delete? shouldAnonymize = tableName in ANONYMIZE_TABLES for columnName, recordKeys in references.items(): if shouldAnonymize: # Anonymize audit/log tables count = _anonymizeRecords( dbConnector, tableName, columnName, recordKeys, pkColumns ) stats["recordsAnonymized"] += count else: # Delete from regular tables count = _deleteRecords(dbConnector, tableName, recordKeys, pkColumns) stats["recordsDeleted"] += count except Exception as tableErr: errorMsg = f"Error processing table {tableName}: {tableErr}" logger.error(errorMsg) stats["errors"].append(errorMsg) logger.info(f"Completed deletion in {databaseName}: {stats}") return stats except Exception as e: errorMsg = f"Error processing database {databaseName}: {e}" logger.error(errorMsg) stats["errors"].append(errorMsg) return stats def deleteUserDataAcrossAllDatabases(userId: str, currentUser) -> Dict[str, Any]: """ Delete or anonymize all user data across ALL databases. This is the main entry point for GDPR Article 17 (Right to Erasure). Features: - Automatically discovers all databases and tables - Schema-based: No hardcoded table lists - Safe: Anonymizes audit logs instead of deleting them - Comprehensive: Covers App, Management, Chat, and all Feature DBs Args: userId: User ID to delete currentUser: User object (for interface access) Returns: Dict with comprehensive deletion statistics """ allStats = { "userId": userId, "deletedAt": getUtcTimestamp(), "databases": [], "totalTablesProcessed": 0, "totalRecordsDeleted": 0, "totalRecordsAnonymized": 0, "errors": [] } try: # Import all database interfaces from modules.interfaces.interfaceDbApp import getRootInterface as getAppInterface from modules.interfaces.interfaceDbManagement import getInterface as getMgmtInterface from modules.interfaces.interfaceDbChat import getInterface as getChatInterface # 1. Process App DB (poweron_app) try: appInterface = getAppInterface() appStats = deleteUserDataFromDatabase(appInterface.db, userId, "poweron_app") allStats["databases"].append(appStats) allStats["totalTablesProcessed"] += appStats["tablesProcessed"] allStats["totalRecordsDeleted"] += appStats["recordsDeleted"] allStats["totalRecordsAnonymized"] += appStats["recordsAnonymized"] allStats["errors"].extend(appStats["errors"]) except Exception as appErr: errorMsg = f"Error processing App DB: {appErr}" logger.error(errorMsg) allStats["errors"].append(errorMsg) # 2. Process Management DB (poweron_management) try: mgmtInterface = getMgmtInterface(currentUser) mgmtStats = deleteUserDataFromDatabase(mgmtInterface.db, userId, "poweron_management") allStats["databases"].append(mgmtStats) allStats["totalTablesProcessed"] += mgmtStats["tablesProcessed"] allStats["totalRecordsDeleted"] += mgmtStats["recordsDeleted"] allStats["totalRecordsAnonymized"] += mgmtStats["recordsAnonymized"] allStats["errors"].extend(mgmtStats["errors"]) except Exception as mgmtErr: errorMsg = f"Error processing Management DB: {mgmtErr}" logger.error(errorMsg) allStats["errors"].append(errorMsg) # 3. Process Chat DB (poweron_chat) try: chatInterface = getChatInterface(currentUser) chatStats = deleteUserDataFromDatabase(chatInterface.db, userId, "poweron_chat") allStats["databases"].append(chatStats) allStats["totalTablesProcessed"] += chatStats["tablesProcessed"] allStats["totalRecordsDeleted"] += chatStats["recordsDeleted"] allStats["totalRecordsAnonymized"] += chatStats["recordsAnonymized"] allStats["errors"].extend(chatStats["errors"]) except Exception as chatErr: errorMsg = f"Error processing Chat DB: {chatErr}" logger.error(errorMsg) allStats["errors"].append(errorMsg) # 4. Process Feature DBs (discover dynamically) try: featureStats = _deleteUserDataFromFeatureDatabases(userId, currentUser) allStats["databases"].extend(featureStats["databases"]) allStats["totalTablesProcessed"] += featureStats["totalTablesProcessed"] allStats["totalRecordsDeleted"] += featureStats["totalRecordsDeleted"] allStats["totalRecordsAnonymized"] += featureStats["totalRecordsAnonymized"] allStats["errors"].extend(featureStats["errors"]) except Exception as featureErr: errorMsg = f"Error processing Feature DBs: {featureErr}" logger.error(errorMsg) allStats["errors"].append(errorMsg) # Log summary logger.info(f"GDPR deletion completed for user {userId}: " f"{allStats['totalRecordsDeleted']} deleted, " f"{allStats['totalRecordsAnonymized']} anonymized across " f"{len(allStats['databases'])} databases") return allStats except Exception as e: logger.error(f"Fatal error in deleteUserDataAcrossAllDatabases: {e}") allStats["errors"].append(f"Fatal error: {e}") return allStats def _deleteUserDataFromFeatureDatabases(userId: str, currentUser) -> Dict[str, Any]: """ Delete user data from all feature-specific databases. Discovers feature interfaces dynamically. Args: userId: User ID to delete currentUser: User object Returns: Dict with deletion statistics """ stats = { "databases": [], "totalTablesProcessed": 0, "totalRecordsDeleted": 0, "totalRecordsAnonymized": 0, "errors": [] } try: # Get all feature instances for this user to determine which feature DBs to check from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelMembership import FeatureAccess from modules.datamodels.datamodelFeatures import FeatureInstance rootInterface = getRootInterface() # Get all feature accesses for this user featureAccesses = rootInterface.db.getRecordset( FeatureAccess, recordFilter={"userId": str(userId)} ) # Collect unique feature codes featureCodes: Set[str] = set() for fa in featureAccesses: instanceId = fa.get("featureInstanceId") instanceRecords = rootInterface.db.getRecordset( FeatureInstance, recordFilter={"id": instanceId} ) if instanceRecords: featureCode = instanceRecords[0].get("featureCode") if featureCode: featureCodes.add(featureCode) logger.info(f"Found {len(featureCodes)} feature types to process: {featureCodes}") # Process each feature type for featureCode in featureCodes: try: dbName = f"poweron_{featureCode}" # Try to get feature interface featureInterface = None if featureCode == "trustee": from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface featureInterface = getTrusteeInterface(currentUser) elif featureCode == "realestate": from modules.features.realestate.interfaceFeatureRealEstate import getInterface as getRealEstateInterface featureInterface = getRealEstateInterface(currentUser) elif featureCode == "chatbot": from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface featureInterface = getChatbotInterface(currentUser) elif featureCode == "neutralization": from modules.features.neutralization.interfaceFeatureNeutralizer import getInterface as getNeutralizerInterface featureInterface = getNeutralizerInterface(currentUser) else: logger.warning(f"No interface found for feature code: {featureCode}") continue if featureInterface and hasattr(featureInterface, 'db'): featureStats = deleteUserDataFromDatabase( featureInterface.db, userId, dbName ) stats["databases"].append(featureStats) stats["totalTablesProcessed"] += featureStats["tablesProcessed"] stats["totalRecordsDeleted"] += featureStats["recordsDeleted"] stats["totalRecordsAnonymized"] += featureStats["recordsAnonymized"] stats["errors"].extend(featureStats["errors"]) except Exception as featureErr: errorMsg = f"Error processing feature {featureCode}: {featureErr}" logger.warning(errorMsg) stats["errors"].append(errorMsg) return stats except Exception as e: logger.error(f"Error in _deleteUserDataFromFeatureDatabases: {e}") stats["errors"].append(f"Feature DB error: {e}") return stats def buildDeletionSummary(stats: Dict[str, Any]) -> List[str]: """ Build a human-readable summary of the deletion operation. Args: stats: Statistics dict from deleteUserDataAcrossAllDatabases Returns: List of summary strings """ summary = [] for dbStats in stats.get("databases", []): dbName = dbStats.get("database", "unknown") deleted = dbStats.get("recordsDeleted", 0) anonymized = dbStats.get("recordsAnonymized", 0) if deleted > 0 or anonymized > 0: parts = [] if deleted > 0: parts.append(f"{deleted} deleted") if anonymized > 0: parts.append(f"{anonymized} anonymized") summary.append(f"{dbName}: {', '.join(parts)}") # Add totals totalDeleted = stats.get("totalRecordsDeleted", 0) totalAnonymized = stats.get("totalRecordsAnonymized", 0) summary.append(f"Total: {totalDeleted} deleted, {totalAnonymized} anonymized") return summary