gateway/modules/shared/gdprDeletion.py
2026-01-25 23:57:41 +01:00

679 lines
24 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Generic GDPR data deletion engine.
Automatically discovers and deletes user data across all databases and tables.
Design:
- Schema-based: Inspects database schemas to find user-related columns
- Generic: Works with any new features/tables without code changes
- Safe: Anonymizes audit logs instead of deleting them
- Comprehensive: Covers all databases (App, Management, Chat, Feature-DBs)
"""
import logging
from typing import List, Dict, Any, Set, Tuple
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
# Tables to SKIP (never delete from these)
PROTECTED_TABLES = {
"_system", # System metadata table
"UserInDB", # User account table (deleted separately at the end)
}
# Tables to ANONYMIZE instead of DELETE (for compliance)
ANONYMIZE_TABLES = {
"AuditEvent", # Audit logs must be retained for compliance
"AuthEvent", # Authentication logs must be retained for compliance
}
# User reference column patterns to search for
USER_COLUMNS = [
"userId",
"createdBy",
"usedBy",
"revokedBy",
"_createdBy",
"_modifiedBy",
]
def _getTableColumns(dbConnector, tableName: str) -> List[str]:
"""
Get all column names for a table by inspecting the schema.
Args:
dbConnector: DatabaseConnector instance
tableName: Name of the table
Returns:
List of column names
"""
try:
query = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = %s
AND table_schema = 'public'
ORDER BY ordinal_position
"""
cursor = dbConnector.connection.cursor()
cursor.execute(query, (tableName,))
columns = [row[0] for row in cursor.fetchall()]
cursor.close()
return columns
except Exception as e:
logger.error(f"Error getting columns for table {tableName}: {e}")
return []
def _getAllTables(dbConnector) -> List[str]:
"""
Get all table names from a database, sorted by dependency order.
Child tables (with foreign keys) come before parent tables.
Args:
dbConnector: DatabaseConnector instance
Returns:
List of table names in deletion order
"""
try:
# Get all tables
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
cursor = dbConnector.connection.cursor()
cursor.execute(query)
allTables = [row[0] for row in cursor.fetchall()]
# Get foreign key relationships to determine dependency order
fkQuery = """
SELECT
tc.table_name,
ccu.table_name AS foreign_table_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'public'
"""
cursor.execute(fkQuery)
foreignKeys = cursor.fetchall()
cursor.close()
# Build dependency graph (child -> parent mapping)
dependencies = {}
for childTable, parentTable in foreignKeys:
if childTable not in dependencies:
dependencies[childTable] = []
dependencies[childTable].append(parentTable)
# Sort tables by dependency (topological sort)
sortedTables = []
visited = set()
def visit(table):
if table in visited or table not in allTables:
return
visited.add(table)
# Visit dependencies first (parents)
if table in dependencies:
for parent in dependencies[table]:
visit(parent)
sortedTables.append(table)
# Visit all tables
for table in allTables:
visit(table)
# Reverse to get deletion order (children before parents)
sortedTables.reverse()
# Filter out protected tables
return [t for t in sortedTables if t not in PROTECTED_TABLES]
except Exception as e:
logger.error(f"Error getting tables from database: {e}")
# Fallback: return simple list without ordering
try:
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
cursor = dbConnector.connection.cursor()
cursor.execute(query)
tables = [row[0] for row in cursor.fetchall()]
cursor.close()
return [t for t in tables if t not in PROTECTED_TABLES]
except Exception:
return []
def _getPrimaryKeyColumns(dbConnector, tableName: str) -> List[str]:
"""
Get primary key column(s) for a table.
Args:
dbConnector: DatabaseConnector instance
tableName: Name of the table
Returns:
List of primary key column names
"""
try:
query = """
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid
AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass
AND i.indisprimary
"""
cursor = dbConnector.connection.cursor()
cursor.execute(query, (tableName,))
pkColumns = [row[0] for row in cursor.fetchall()]
cursor.close()
return pkColumns
except Exception as e:
logger.debug(f"Could not get primary key for {tableName}: {e}")
return ["id"] # Fallback to 'id'
def _findUserReferencesInTable(
dbConnector,
tableName: str,
userId: str
) -> Dict[str, List[Tuple]]:
"""
Find all records in a table that reference a user.
Args:
dbConnector: DatabaseConnector instance
tableName: Name of the table
userId: User ID to search for
Returns:
Dict mapping column names to lists of primary key tuples
"""
try:
# Get all columns for this table
columns = _getTableColumns(dbConnector, tableName)
# Find user-related columns in this table
userColumns = [col for col in columns if col in USER_COLUMNS]
if not userColumns:
return {}
# Get primary key columns
pkColumns = _getPrimaryKeyColumns(dbConnector, tableName)
if not pkColumns:
logger.warning(f"Table {tableName} has no primary key, skipping")
return {}
references = {}
cursor = dbConnector.connection.cursor()
for userColumn in userColumns:
# Build SELECT for primary key columns
pkSelect = ", ".join([f'"{pk}"' for pk in pkColumns])
query = f'SELECT {pkSelect} FROM "{tableName}" WHERE "{userColumn}" = %s'
cursor.execute(query, (userId,))
recordKeys = cursor.fetchall()
if recordKeys:
references[userColumn] = recordKeys
logger.debug(f"Found {len(recordKeys)} records in {tableName}.{userColumn} for user {userId}")
cursor.close()
return references
except Exception as e:
logger.error(f"Error finding user references in {tableName}: {e}")
return {}
def _anonymizeRecords(
dbConnector,
tableName: str,
columnName: str,
recordKeys: List[Tuple],
pkColumns: List[str],
anonymousValue: str = "deleted_user"
) -> int:
"""
Anonymize user references in records (set to 'deleted_user').
Args:
dbConnector: DatabaseConnector instance
tableName: Name of the table
columnName: Name of the column to anonymize
recordKeys: List of primary key tuples
pkColumns: List of primary key column names
anonymousValue: Value to set (default: "deleted_user")
Returns:
Number of records anonymized
"""
if not recordKeys:
return 0
try:
cursor = dbConnector.connection.cursor()
count = 0
for recordKey in recordKeys:
# Build WHERE clause for primary key
whereClause = " AND ".join([f'"{pk}" = %s' for pk in pkColumns])
# Check if table has _modifiedAt column
columns = _getTableColumns(dbConnector, tableName)
hasModifiedAt = "_modifiedAt" in columns
if hasModifiedAt:
query = f'UPDATE "{tableName}" SET "{columnName}" = %s, "_modifiedAt" = %s WHERE {whereClause}'
params = [anonymousValue, getUtcTimestamp()]
else:
query = f'UPDATE "{tableName}" SET "{columnName}" = %s WHERE {whereClause}'
params = [anonymousValue]
# Add primary key values to params
if isinstance(recordKey, tuple):
params.extend(recordKey)
else:
params.append(recordKey)
cursor.execute(query, params)
count += cursor.rowcount
dbConnector.connection.commit()
cursor.close()
logger.info(f"Anonymized {count} records in {tableName}.{columnName}")
return count
except Exception as e:
logger.error(f"Error anonymizing records in {tableName}.{columnName}: {e}")
dbConnector.connection.rollback()
return 0
def _deleteRecords(
dbConnector,
tableName: str,
recordKeys: List[Tuple],
pkColumns: List[str]
) -> int:
"""
Delete records from a table.
Args:
dbConnector: DatabaseConnector instance
tableName: Name of the table
recordKeys: List of primary key tuples
pkColumns: List of primary key column names
Returns:
Number of records deleted
"""
if not recordKeys:
return 0
try:
cursor = dbConnector.connection.cursor()
count = 0
for recordKey in recordKeys:
# Build WHERE clause for primary key
whereClause = " AND ".join([f'"{pk}" = %s' for pk in pkColumns])
query = f'DELETE FROM "{tableName}" WHERE {whereClause}'
# Prepare params
if isinstance(recordKey, tuple):
params = list(recordKey)
else:
params = [recordKey]
cursor.execute(query, params)
count += cursor.rowcount
dbConnector.connection.commit()
cursor.close()
logger.info(f"Deleted {count} records from {tableName}")
return count
except Exception as e:
logger.error(f"Error deleting records from {tableName}: {e}")
dbConnector.connection.rollback()
return 0
def deleteUserDataFromDatabase(
dbConnector,
userId: str,
databaseName: str
) -> Dict[str, Any]:
"""
Delete or anonymize all user data from a single database.
Args:
dbConnector: DatabaseConnector instance
userId: User ID to delete
databaseName: Name of the database (for logging)
Returns:
Dict with deletion statistics
"""
stats = {
"database": databaseName,
"tablesProcessed": 0,
"recordsDeleted": 0,
"recordsAnonymized": 0,
"errors": []
}
try:
# Get all tables in this database
tables = _getAllTables(dbConnector)
logger.info(f"Processing {len(tables)} tables in {databaseName} for user {userId}")
for tableName in tables:
try:
# Get primary key columns for this table
pkColumns = _getPrimaryKeyColumns(dbConnector, tableName)
if not pkColumns:
logger.debug(f"Skipping {tableName} - no primary key")
continue
# Find user references in this table
references = _findUserReferencesInTable(dbConnector, tableName, userId)
if not references:
continue
stats["tablesProcessed"] += 1
# Decide: Anonymize or Delete?
shouldAnonymize = tableName in ANONYMIZE_TABLES
for columnName, recordKeys in references.items():
if shouldAnonymize:
# Anonymize audit/log tables
count = _anonymizeRecords(
dbConnector, tableName, columnName, recordKeys, pkColumns
)
stats["recordsAnonymized"] += count
else:
# Delete from regular tables
count = _deleteRecords(dbConnector, tableName, recordKeys, pkColumns)
stats["recordsDeleted"] += count
except Exception as tableErr:
errorMsg = f"Error processing table {tableName}: {tableErr}"
logger.error(errorMsg)
stats["errors"].append(errorMsg)
logger.info(f"Completed deletion in {databaseName}: {stats}")
return stats
except Exception as e:
errorMsg = f"Error processing database {databaseName}: {e}"
logger.error(errorMsg)
stats["errors"].append(errorMsg)
return stats
def deleteUserDataAcrossAllDatabases(userId: str, currentUser) -> Dict[str, Any]:
"""
Delete or anonymize all user data across ALL databases.
This is the main entry point for GDPR Article 17 (Right to Erasure).
Features:
- Automatically discovers all databases and tables
- Schema-based: No hardcoded table lists
- Safe: Anonymizes audit logs instead of deleting them
- Comprehensive: Covers App, Management, Chat, and all Feature DBs
Args:
userId: User ID to delete
currentUser: User object (for interface access)
Returns:
Dict with comprehensive deletion statistics
"""
allStats = {
"userId": userId,
"deletedAt": getUtcTimestamp(),
"databases": [],
"totalTablesProcessed": 0,
"totalRecordsDeleted": 0,
"totalRecordsAnonymized": 0,
"errors": []
}
try:
# Import all database interfaces
from modules.interfaces.interfaceDbApp import getRootInterface as getAppInterface
from modules.interfaces.interfaceDbManagement import getInterface as getMgmtInterface
from modules.interfaces.interfaceDbChat import getInterface as getChatInterface
# 1. Process App DB (poweron_app)
try:
appInterface = getAppInterface()
appStats = deleteUserDataFromDatabase(appInterface.db, userId, "poweron_app")
allStats["databases"].append(appStats)
allStats["totalTablesProcessed"] += appStats["tablesProcessed"]
allStats["totalRecordsDeleted"] += appStats["recordsDeleted"]
allStats["totalRecordsAnonymized"] += appStats["recordsAnonymized"]
allStats["errors"].extend(appStats["errors"])
except Exception as appErr:
errorMsg = f"Error processing App DB: {appErr}"
logger.error(errorMsg)
allStats["errors"].append(errorMsg)
# 2. Process Management DB (poweron_management)
try:
mgmtInterface = getMgmtInterface(currentUser)
mgmtStats = deleteUserDataFromDatabase(mgmtInterface.db, userId, "poweron_management")
allStats["databases"].append(mgmtStats)
allStats["totalTablesProcessed"] += mgmtStats["tablesProcessed"]
allStats["totalRecordsDeleted"] += mgmtStats["recordsDeleted"]
allStats["totalRecordsAnonymized"] += mgmtStats["recordsAnonymized"]
allStats["errors"].extend(mgmtStats["errors"])
except Exception as mgmtErr:
errorMsg = f"Error processing Management DB: {mgmtErr}"
logger.error(errorMsg)
allStats["errors"].append(errorMsg)
# 3. Process Chat DB (poweron_chat)
try:
chatInterface = getChatInterface(currentUser)
chatStats = deleteUserDataFromDatabase(chatInterface.db, userId, "poweron_chat")
allStats["databases"].append(chatStats)
allStats["totalTablesProcessed"] += chatStats["tablesProcessed"]
allStats["totalRecordsDeleted"] += chatStats["recordsDeleted"]
allStats["totalRecordsAnonymized"] += chatStats["recordsAnonymized"]
allStats["errors"].extend(chatStats["errors"])
except Exception as chatErr:
errorMsg = f"Error processing Chat DB: {chatErr}"
logger.error(errorMsg)
allStats["errors"].append(errorMsg)
# 4. Process Feature DBs (discover dynamically)
try:
featureStats = _deleteUserDataFromFeatureDatabases(userId, currentUser)
allStats["databases"].extend(featureStats["databases"])
allStats["totalTablesProcessed"] += featureStats["totalTablesProcessed"]
allStats["totalRecordsDeleted"] += featureStats["totalRecordsDeleted"]
allStats["totalRecordsAnonymized"] += featureStats["totalRecordsAnonymized"]
allStats["errors"].extend(featureStats["errors"])
except Exception as featureErr:
errorMsg = f"Error processing Feature DBs: {featureErr}"
logger.error(errorMsg)
allStats["errors"].append(errorMsg)
# Log summary
logger.info(f"GDPR deletion completed for user {userId}: "
f"{allStats['totalRecordsDeleted']} deleted, "
f"{allStats['totalRecordsAnonymized']} anonymized across "
f"{len(allStats['databases'])} databases")
return allStats
except Exception as e:
logger.error(f"Fatal error in deleteUserDataAcrossAllDatabases: {e}")
allStats["errors"].append(f"Fatal error: {e}")
return allStats
def _deleteUserDataFromFeatureDatabases(userId: str, currentUser) -> Dict[str, Any]:
"""
Delete user data from all feature-specific databases.
Discovers feature interfaces dynamically.
Args:
userId: User ID to delete
currentUser: User object
Returns:
Dict with deletion statistics
"""
stats = {
"databases": [],
"totalTablesProcessed": 0,
"totalRecordsDeleted": 0,
"totalRecordsAnonymized": 0,
"errors": []
}
try:
# Get all feature instances for this user to determine which feature DBs to check
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelMembership import FeatureAccess
from modules.datamodels.datamodelFeatures import FeatureInstance
rootInterface = getRootInterface()
# Get all feature accesses for this user
featureAccesses = rootInterface.db.getRecordset(
FeatureAccess,
recordFilter={"userId": str(userId)}
)
# Collect unique feature codes
featureCodes: Set[str] = set()
for fa in featureAccesses:
instanceId = fa.get("featureInstanceId")
instanceRecords = rootInterface.db.getRecordset(
FeatureInstance,
recordFilter={"id": instanceId}
)
if instanceRecords:
featureCode = instanceRecords[0].get("featureCode")
if featureCode:
featureCodes.add(featureCode)
logger.info(f"Found {len(featureCodes)} feature types to process: {featureCodes}")
# Process each feature type
for featureCode in featureCodes:
try:
dbName = f"poweron_{featureCode}"
# Try to get feature interface
featureInterface = None
if featureCode == "trustee":
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
featureInterface = getTrusteeInterface(currentUser)
elif featureCode == "realestate":
from modules.features.realestate.interfaceFeatureRealEstate import getInterface as getRealEstateInterface
featureInterface = getRealEstateInterface(currentUser)
elif featureCode == "chatbot":
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
featureInterface = getChatbotInterface(currentUser)
elif featureCode == "neutralization":
from modules.features.neutralization.interfaceFeatureNeutralizer import getInterface as getNeutralizerInterface
featureInterface = getNeutralizerInterface(currentUser)
else:
logger.warning(f"No interface found for feature code: {featureCode}")
continue
if featureInterface and hasattr(featureInterface, 'db'):
featureStats = deleteUserDataFromDatabase(
featureInterface.db,
userId,
dbName
)
stats["databases"].append(featureStats)
stats["totalTablesProcessed"] += featureStats["tablesProcessed"]
stats["totalRecordsDeleted"] += featureStats["recordsDeleted"]
stats["totalRecordsAnonymized"] += featureStats["recordsAnonymized"]
stats["errors"].extend(featureStats["errors"])
except Exception as featureErr:
errorMsg = f"Error processing feature {featureCode}: {featureErr}"
logger.warning(errorMsg)
stats["errors"].append(errorMsg)
return stats
except Exception as e:
logger.error(f"Error in _deleteUserDataFromFeatureDatabases: {e}")
stats["errors"].append(f"Feature DB error: {e}")
return stats
def buildDeletionSummary(stats: Dict[str, Any]) -> List[str]:
"""
Build a human-readable summary of the deletion operation.
Args:
stats: Statistics dict from deleteUserDataAcrossAllDatabases
Returns:
List of summary strings
"""
summary = []
for dbStats in stats.get("databases", []):
dbName = dbStats.get("database", "unknown")
deleted = dbStats.get("recordsDeleted", 0)
anonymized = dbStats.get("recordsAnonymized", 0)
if deleted > 0 or anonymized > 0:
parts = []
if deleted > 0:
parts.append(f"{deleted} deleted")
if anonymized > 0:
parts.append(f"{anonymized} anonymized")
summary.append(f"{dbName}: {', '.join(parts)}")
# Add totals
totalDeleted = stats.get("totalRecordsDeleted", 0)
totalAnonymized = stats.get("totalRecordsAnonymized", 0)
summary.append(f"Total: {totalDeleted} deleted, {totalAnonymized} anonymized")
return summary