gateway/modules/shared/dbMultiTenantOptimizations.py
patrick-motsch 77151df0f4 fix: STT audioFormat scoping, automation import name, orphan FK cleanup
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-13 18:03:23 +01:00

540 lines
19 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Database optimizations for Multi-Tenant model.
Applies indexes, immutable triggers, and foreign key constraints
for the junction tables used in the multi-tenant mandate model.
Usage:
from modules.shared.dbMultiTenantOptimizations import applyMultiTenantOptimizations
# Call after database tables are created
applyMultiTenantOptimizations(dbConnector)
All operations are idempotent (safe to call multiple times).
"""
import logging
from typing import Optional, List
logger = logging.getLogger(__name__)
def _getConnection(dbConnector):
"""Get a connection from the DatabaseConnector.
Ensures the connection is alive and returns it.
Commits any pending transaction first to avoid blocking.
"""
dbConnector._ensure_connection()
conn = dbConnector.connection
# Commit any pending transaction to avoid blocking
try:
conn.commit()
except Exception:
pass # Ignore if nothing to commit
return conn
# =============================================================================
# Index Definitions
# =============================================================================
_INDEXES = [
# UserMandate indexes
("UserMandate", "idx_usermandate_user", ["userId"]),
("UserMandate", "idx_usermandate_user_mandate", ["userId", "mandateId"]),
("UserMandate", "idx_usermandate_mandate", ["mandateId"]),
# UserMandateRole indexes
("UserMandateRole", "idx_usermandaterole_usermandate", ["userMandateId"]),
("UserMandateRole", "idx_usermandaterole_role", ["roleId"]),
# FeatureAccess indexes
("FeatureAccess", "idx_featureaccess_user_instance", ["userId", "featureInstanceId"]),
("FeatureAccess", "idx_featureaccess_user", ["userId"]),
("FeatureAccess", "idx_featureaccess_instance", ["featureInstanceId"]),
# FeatureAccessRole indexes
("FeatureAccessRole", "idx_featureaccessrole_featureaccess", ["featureAccessId"]),
("FeatureAccessRole", "idx_featureaccessrole_role", ["roleId"]),
# AccessRule indexes
("AccessRule", "idx_accessrule_roleid", ["roleId"]),
("AccessRule", "idx_accessrule_context_roleid", ["context", "roleId"]),
# Role indexes
("Role", "idx_role_mandate_instance", ["mandateId", "featureInstanceId"]),
("Role", "idx_role_label", ["roleLabel"]),
# FeatureInstance indexes
("FeatureInstance", "idx_featureinstance_mandate", ["mandateId"]),
("FeatureInstance", "idx_featureinstance_mandate_code", ["mandateId", "featureCode"]),
# Invitation indexes
("Invitation", "idx_invitation_mandate", ["mandateId"]),
("Invitation", "idx_invitation_createdby", ["createdBy"]),
]
# Unique indexes (separate list)
_UNIQUE_INDEXES = [
("Invitation", "idx_invitation_token", ["token"]),
]
# Partial indexes (with WHERE clause)
_PARTIAL_INDEXES = [
("UserMandate", "idx_usermandate_user_enabled", ["userId"], '"enabled" = true'),
("Role", "idx_role_featurecode", ["featureCode"], '"mandateId" IS NULL'),
]
# =============================================================================
# Foreign Key Definitions
# =============================================================================
_FOREIGN_KEYS = [
# UserMandate FKs
("UserMandate", "fk_usermandate_mandate", "mandateId", "Mandate", "id"),
("UserMandate", "fk_usermandate_user", "userId", "UserInDB", "id"),
# FeatureInstance FKs
("FeatureInstance", "fk_featureinstance_mandate", "mandateId", "Mandate", "id"),
# Role FKs (nullable - only cascade when not null)
("Role", "fk_role_mandate", "mandateId", "Mandate", "id"),
("Role", "fk_role_instance", "featureInstanceId", "FeatureInstance", "id"),
# FeatureAccess FKs
("FeatureAccess", "fk_featureaccess_instance", "featureInstanceId", "FeatureInstance", "id"),
("FeatureAccess", "fk_featureaccess_user", "userId", "UserInDB", "id"),
# AccessRule FKs
("AccessRule", "fk_accessrule_role", "roleId", "Role", "id"),
# Junction table FKs
("UserMandateRole", "fk_usermandaterole_usermandate", "userMandateId", "UserMandate", "id"),
("UserMandateRole", "fk_usermandaterole_role", "roleId", "Role", "id"),
("FeatureAccessRole", "fk_featureaccessrole_featureaccess", "featureAccessId", "FeatureAccess", "id"),
("FeatureAccessRole", "fk_featureaccessrole_role", "roleId", "Role", "id"),
# Invitation FKs
("Invitation", "fk_invitation_mandate", "mandateId", "Mandate", "id"),
]
# =============================================================================
# Immutable Trigger Definitions
# =============================================================================
_IMMUTABLE_TRIGGERS = [
# Role: mandateId, featureInstanceId, featureCode are immutable
("Role", "tr_role_immutable", ["mandateId", "featureInstanceId", "featureCode"]),
# AccessRule: context, roleId are immutable
("AccessRule", "tr_accessrule_immutable", ["context", "roleId"]),
# User: username is immutable (login name cannot be changed)
("UserInDB", "tr_user_immutable", ["username"]),
]
# =============================================================================
# Main Functions
# =============================================================================
def applyMultiTenantOptimizations(dbConnector, tables: Optional[List[str]] = None) -> dict:
"""
Apply all multi-tenant database optimizations.
Args:
dbConnector: Database connector with execute capability
tables: Optional list of table names to optimize. If None, optimizes all.
Returns:
dict with counts of created indexes, triggers, and foreign keys
"""
results = {
"indexesCreated": 0,
"triggersCreated": 0,
"foreignKeysCreated": 0,
"errors": []
}
try:
# Get a connection from the connector
conn = _getConnection(dbConnector)
# Save and set autocommit state
try:
originalAutocommit = conn.autocommit
except Exception:
originalAutocommit = False
try:
conn.autocommit = True
except Exception as autoErr:
logger.debug(f"Could not set autocommit: {autoErr}")
try:
with conn.cursor() as cursor:
# Apply indexes
results["indexesCreated"] = _applyIndexes(cursor, tables)
# Apply foreign keys
results["foreignKeysCreated"] = _applyForeignKeys(cursor, tables)
# Apply immutable triggers
results["triggersCreated"] = _applyImmutableTriggers(cursor, tables)
logger.info(
f"Multi-tenant optimizations applied: "
f"{results['indexesCreated']} indexes, "
f"{results['triggersCreated']} triggers, "
f"{results['foreignKeysCreated']} foreign keys"
)
finally:
# Restore original autocommit state
try:
conn.autocommit = originalAutocommit
except Exception:
pass
except Exception as e:
logger.error(f"Error applying multi-tenant optimizations: {type(e).__name__}: {e}")
results["errors"].append(str(e))
return results
def applyIndexesOnly(dbConnector, tables: Optional[List[str]] = None) -> int:
"""Apply only indexes (lighter operation, safe for frequent calls)."""
try:
conn = _getConnection(dbConnector)
originalAutocommit = conn.autocommit
conn.autocommit = True
try:
with conn.cursor() as cursor:
return _applyIndexes(cursor, tables)
finally:
conn.autocommit = originalAutocommit
except Exception as e:
logger.error(f"Error applying indexes: {e}")
return 0
# =============================================================================
# Internal Implementation
# =============================================================================
def _tableExists(cursor, tableName: str) -> bool:
"""Check if a table exists in the database."""
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = %s
) AS exists
""", (tableName,))
row = cursor.fetchone()
# Handle both dict (RealDictCursor) and tuple results
if isinstance(row, dict):
return row.get('exists', False)
return row[0] if row else False
def _indexExists(cursor, indexName: str) -> bool:
"""Check if an index exists."""
cursor.execute("""
SELECT EXISTS (
SELECT FROM pg_indexes
WHERE indexname = %s
) AS exists
""", (indexName,))
row = cursor.fetchone()
if isinstance(row, dict):
return row.get('exists', False)
return row[0] if row else False
def _constraintExists(cursor, constraintName: str) -> bool:
"""Check if a constraint exists."""
cursor.execute("""
SELECT EXISTS (
SELECT FROM pg_constraint
WHERE conname = %s
) AS exists
""", (constraintName,))
row = cursor.fetchone()
if isinstance(row, dict):
return row.get('exists', False)
return row[0] if row else False
def _triggerExists(cursor, triggerName: str) -> bool:
"""Check if a trigger exists."""
cursor.execute("""
SELECT EXISTS (
SELECT FROM pg_trigger
WHERE tgname = %s
) AS exists
""", (triggerName,))
row = cursor.fetchone()
if isinstance(row, dict):
return row.get('exists', False)
return row[0] if row else False
def _applyIndexes(cursor, tables: Optional[List[str]]) -> int:
"""Apply all indexes. Returns count of newly created indexes."""
created = 0
# Regular indexes
for tableName, indexName, columns in _INDEXES:
if tables and tableName not in tables:
continue
if not _tableExists(cursor, tableName):
continue
if _indexExists(cursor, indexName):
continue
try:
columnList = ", ".join(f'"{c}"' for c in columns)
cursor.execute(f'CREATE INDEX "{indexName}" ON "{tableName}" ({columnList})')
created += 1
logger.debug(f"Created index {indexName} on {tableName}")
except Exception as e:
logger.warning(f"Failed to create index {indexName}: {e}")
# Unique indexes
for tableName, indexName, columns in _UNIQUE_INDEXES:
if tables and tableName not in tables:
continue
if not _tableExists(cursor, tableName):
continue
if _indexExists(cursor, indexName):
continue
try:
columnList = ", ".join(f'"{c}"' for c in columns)
cursor.execute(f'CREATE UNIQUE INDEX "{indexName}" ON "{tableName}" ({columnList})')
created += 1
logger.debug(f"Created unique index {indexName} on {tableName}")
except Exception as e:
logger.warning(f"Failed to create unique index {indexName}: {e}")
# Partial indexes
for tableName, indexName, columns, whereClause in _PARTIAL_INDEXES:
if tables and tableName not in tables:
continue
if not _tableExists(cursor, tableName):
continue
if _indexExists(cursor, indexName):
continue
try:
columnList = ", ".join(f'"{c}"' for c in columns)
cursor.execute(f'CREATE INDEX "{indexName}" ON "{tableName}" ({columnList}) WHERE {whereClause}')
created += 1
logger.debug(f"Created partial index {indexName} on {tableName}")
except Exception as e:
logger.warning(f"Failed to create partial index {indexName}: {e}")
return created
def _getForeignKeyTarget(cursor, constraintName: str) -> Optional[str]:
"""Get the target table of an existing FK constraint."""
cursor.execute("""
SELECT ccu.table_name AS foreign_table_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.constraint_name = %s
LIMIT 1
""", (constraintName,))
row = cursor.fetchone()
if row:
if isinstance(row, dict):
return row.get('foreign_table_name')
return row[0]
return None
def _applyForeignKeys(cursor, tables: Optional[List[str]]) -> int:
"""Apply foreign key constraints with CASCADE DELETE. Returns count created."""
created = 0
for tableName, constraintName, column, refTable, refColumn in _FOREIGN_KEYS:
if tables and tableName not in tables:
continue
if not _tableExists(cursor, tableName):
continue
if not _tableExists(cursor, refTable):
continue
# Check if constraint exists
if _constraintExists(cursor, constraintName):
# Verify it points to the correct table
currentTarget = _getForeignKeyTarget(cursor, constraintName)
if currentTarget == refTable:
# FK exists and points to correct table - skip
continue
else:
# FK exists but points to wrong table - drop and recreate
logger.info(f"FK {constraintName} points to {currentTarget}, expected {refTable} - recreating")
try:
cursor.execute(f'ALTER TABLE "{tableName}" DROP CONSTRAINT "{constraintName}"')
except Exception as e:
logger.warning(f"Failed to drop FK {constraintName}: {e}")
continue
# Clean up orphaned rows before applying FK constraint
try:
cursor.execute(f"""
DELETE FROM "{tableName}"
WHERE "{column}" IS NOT NULL
AND "{column}" NOT IN (SELECT "id" FROM "{refTable}")
""")
orphanCount = cursor.rowcount
if orphanCount > 0:
logger.info(
f"Cleaned {orphanCount} orphaned row(s) from {tableName} "
f"(missing {refTable} reference via {column})"
)
except Exception as e:
logger.warning(f"Failed to clean orphans for FK {constraintName}: {e}")
try:
cursor.execute(f"""
ALTER TABLE "{tableName}"
ADD CONSTRAINT "{constraintName}"
FOREIGN KEY ("{column}")
REFERENCES "{refTable}"("{refColumn}")
ON DELETE CASCADE
""")
created += 1
logger.debug(f"Created FK {constraintName} on {tableName}")
except Exception as e:
logger.warning(f"Failed to create FK {constraintName}: {e}")
return created
def _applyImmutableTriggers(cursor, tables: Optional[List[str]]) -> int:
"""Apply immutable field triggers. Returns count created."""
created = 0
for tableName, triggerName, immutableFields in _IMMUTABLE_TRIGGERS:
if tables and tableName not in tables:
continue
if not _tableExists(cursor, tableName):
continue
if _triggerExists(cursor, triggerName):
continue
try:
# Create the function
functionName = f"fn_{triggerName}"
checks = []
for field in immutableFields:
checks.append(f"""
IF OLD."{field}" IS DISTINCT FROM NEW."{field}" THEN
RAISE EXCEPTION '{field} is immutable on {tableName}. Delete and recreate instead.';
END IF;
""")
functionBody = "\n".join(checks)
cursor.execute(f"""
CREATE OR REPLACE FUNCTION "{functionName}"()
RETURNS TRIGGER AS $$
BEGIN
{functionBody}
RETURN NEW;
END;
$$ LANGUAGE plpgsql
""")
# Create the trigger
cursor.execute(f"""
CREATE TRIGGER "{triggerName}"
BEFORE UPDATE ON "{tableName}"
FOR EACH ROW
EXECUTE FUNCTION "{functionName}"()
""")
created += 1
logger.debug(f"Created immutable trigger {triggerName} on {tableName}")
except Exception as e:
logger.warning(f"Failed to create trigger {triggerName}: {e}")
return created
# =============================================================================
# Utility: Check optimization status
# =============================================================================
def getOptimizationStatus(dbConnector) -> dict:
"""
Check which optimizations are already applied.
Returns dict with lists of applied and missing optimizations.
"""
status = {
"indexes": {"applied": [], "missing": []},
"uniqueIndexes": {"applied": [], "missing": []},
"partialIndexes": {"applied": [], "missing": []},
"foreignKeys": {"applied": [], "missing": []},
"triggers": {"applied": [], "missing": []}
}
try:
conn = _getConnection(dbConnector)
with conn.cursor() as cursor:
# Check regular indexes
for tableName, indexName, _ in _INDEXES:
if _tableExists(cursor, tableName):
if _indexExists(cursor, indexName):
status["indexes"]["applied"].append(indexName)
else:
status["indexes"]["missing"].append(indexName)
# Check unique indexes
for tableName, indexName, _ in _UNIQUE_INDEXES:
if _tableExists(cursor, tableName):
if _indexExists(cursor, indexName):
status["uniqueIndexes"]["applied"].append(indexName)
else:
status["uniqueIndexes"]["missing"].append(indexName)
# Check partial indexes
for tableName, indexName, _, _ in _PARTIAL_INDEXES:
if _tableExists(cursor, tableName):
if _indexExists(cursor, indexName):
status["partialIndexes"]["applied"].append(indexName)
else:
status["partialIndexes"]["missing"].append(indexName)
# Check foreign keys
for tableName, constraintName, _, _, _ in _FOREIGN_KEYS:
if _tableExists(cursor, tableName):
if _constraintExists(cursor, constraintName):
status["foreignKeys"]["applied"].append(constraintName)
else:
status["foreignKeys"]["missing"].append(constraintName)
# Check triggers
for tableName, triggerName, _ in _IMMUTABLE_TRIGGERS:
if _tableExists(cursor, tableName):
if _triggerExists(cursor, triggerName):
status["triggers"]["applied"].append(triggerName)
else:
status["triggers"]["missing"].append(triggerName)
except Exception as e:
logger.error(f"Error checking optimization status: {e}")
return status