gateway/modules/interfaces/interfaceRbac.py
2026-02-12 00:34:17 +01:00

548 lines
23 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC helper functions for interfaces.
Provides RBAC filtering for database queries without connectors importing security.
Multi-Tenant Design:
- mandateId kommt aus Request-Context (X-Mandate-Id Header)
- GROUP-Filter verwendet expliziten mandateId Parameter
Data Namespace Structure:
- data.uam.{Table} → User Access Management (mandantenübergreifend)
- data.chat.{Table} → Chat/AI-Daten (benutzer-eigen, kein Mandantenkontext)
- data.files.{Table} → Dateien (benutzer-eigen)
- data.automation.{Table} → Automation (benutzer-eigen)
- data.feature.{code}.{Table} → Mandanten-/Feature-spezifische Daten (dynamisch)
GROUP-Berechtigung:
- data.uam.*: GROUP filtert nach Mandant (via UserMandate)
- data.chat.*, data.files.*, data.automation.*: GROUP = MY (benutzer-eigen, kein Mandantenkontext)
- data.feature.*: GROUP filtert nach mandateId/featureInstanceId
"""
import logging
import json
from typing import List, Dict, Any, Optional, Type
from pydantic import BaseModel
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
logger = logging.getLogger(__name__)
# =============================================================================
# Namespace-Mapping für statische Tabellen
# =============================================================================
# Definiert, welcher Namespace für jede Tabelle verwendet wird.
# Tabellen ohne Eintrag fallen auf "system" zurück (Fallback für Rückwärtskompatibilität).
# =============================================================================
TABLE_NAMESPACE = {
# UAM (User Access Management) - mandantenübergreifend
"UserInDB": "uam",
"UserConnection": "uam",
"AuthEvent": "uam",
"Mandate": "uam",
"UserMandate": "uam",
"UserMandateRole": "uam",
"Invitation": "uam",
"Role": "uam",
"AccessRule": "uam",
"FeatureInstance": "uam",
"FeatureAccess": "uam",
"FeatureAccessRole": "uam",
# Chat - benutzer-eigen, kein Mandantenkontext
"ChatWorkflow": "chat",
"ChatMessage": "chat",
"ChatLog": "chat",
"ChatStat": "chat",
"ChatDocument": "chat",
"Prompt": "chat",
# Files - benutzer-eigen
"FileItem": "files",
"FileData": "files",
# Automation - benutzer-eigen
"AutomationDefinition": "automation",
"AutomationTemplate": "automation",
}
# Namespaces ohne Mandantenkontext - GROUP wird auf MY gemappt
USER_OWNED_NAMESPACES = {"chat", "files", "automation"}
def buildDataObjectKey(tableName: str, featureCode: Optional[str] = None) -> str:
"""
Build the standardized objectKey for a DATA context item.
Format:
- UAM tables: data.uam.{TableName}
- Chat tables: data.chat.{TableName}
- File tables: data.files.{TableName}
- Automation tables: data.automation.{TableName}
- Feature tables: data.feature.{featureCode}.{TableName}
Args:
tableName: The database table name (e.g., "UserInDB", "ChatWorkflow")
featureCode: Optional feature code (e.g., "trustee", "realestate")
If provided, uses data.feature.{featureCode}.{tableName}
Returns:
Full objectKey string (e.g., "data.uam.UserInDB", "data.chat.ChatWorkflow",
or "data.feature.trustee.TrusteePosition")
"""
if featureCode:
return f"data.feature.{featureCode}.{tableName}"
namespace = TABLE_NAMESPACE.get(tableName, "system") # Fallback für unbekannte Tabellen
return f"data.{namespace}.{tableName}"
def getRecordsetWithRBAC(
connector, # DatabaseConnector instance
modelClass: Type[BaseModel],
currentUser: User,
recordFilter: Dict[str, Any] = None,
orderBy: str = None,
limit: int = None,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
enrichPermissions: bool = False,
featureCode: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Get records with RBAC filtering applied at database level.
This function wraps connector.getRecordset() with RBAC logic.
Multi-Tenant Design:
- mandateId wird explizit übergeben (aus Request-Context / X-Mandate-Id Header)
Args:
connector: DatabaseConnector instance
modelClass: Pydantic model class for the table
currentUser: User object
recordFilter: Additional record filters
orderBy: Field to order by (defaults to "id")
limit: Maximum number of records to return
mandateId: Explicit mandate context (from request header). Required for GROUP access.
featureInstanceId: Explicit feature instance context
enrichPermissions: If True, adds _permissions field to each record with row-level
permissions { canUpdate, canDelete } based on RBAC rules and _createdBy
featureCode: Optional feature code for feature-specific tables (e.g., "trustee").
If None, table is treated as a system table.
Returns:
List of filtered records (with _permissions if enrichPermissions=True)
"""
table = modelClass.__name__
# Build full objectKey for RBAC lookup
objectKey = buildDataObjectKey(table, featureCode)
effectiveMandateId = mandateId
try:
if not connector._ensureTableExists(modelClass):
return []
# All users (including SysAdmins) go through RBAC filtering
# SysAdmin flag does NOT grant automatic data access - proper RBAC rules must exist
# Get RBAC permissions for this table using full objectKey
# AccessRule table is always in DbApp database
dbApp = getRootDbAppConnector()
rbacInstance = RbacClass(connector, dbApp=dbApp)
permissions = rbacInstance.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
objectKey, # Use full objectKey (e.g., "data.uam.UserInDB", "data.chat.ChatWorkflow")
mandateId=effectiveMandateId,
featureInstanceId=featureInstanceId
)
# Check view permission first
if not permissions.view:
return []
# Build WHERE clause with RBAC filtering
whereConditions = []
whereValues = []
# CRITICAL: Only pass featureInstanceId to WHERE clause if the model actually has
# this column. Chat child tables (ChatMessage, ChatLog, ChatStat, ChatDocument)
# are user-owned and do NOT have featureInstanceId - only ChatWorkflow does.
# Without this check, the SQL query would reference a non-existent column,
# causing a silent error that returns empty results.
featureInstanceIdForQuery = featureInstanceId
if featureInstanceId and hasattr(modelClass, 'model_fields') and "featureInstanceId" not in modelClass.model_fields:
featureInstanceIdForQuery = None
# Add RBAC WHERE clause based on read permission
rbacWhereClause = buildRbacWhereClause(
permissions,
currentUser,
table,
connector,
mandateId=effectiveMandateId,
featureInstanceId=featureInstanceIdForQuery
)
if rbacWhereClause:
whereConditions.append(rbacWhereClause["condition"])
whereValues.extend(rbacWhereClause["values"])
# Add additional record filters
if recordFilter:
for field, value in recordFilter.items():
whereConditions.append(f'"{field}" = %s')
whereValues.append(value)
# Build the query
whereClause = ""
if whereConditions:
whereClause = " WHERE " + " AND ".join(whereConditions)
orderByClause = f' ORDER BY "{orderBy}"' if orderBy else ' ORDER BY "id"'
limitClause = f" LIMIT {limit}" if limit else ""
query = f'SELECT * FROM "{table}"{whereClause}{orderByClause}{limitClause}'
with connector.connection.cursor() as cursor:
cursor.execute(query, whereValues)
records = [dict(row) for row in cursor.fetchall()]
# Handle JSONB fields and ensure numeric types are correct
# Import the helper function from connector module
from modules.connectors.connectorDbPostgre import _get_model_fields
fields = _get_model_fields(modelClass)
for record in records:
for fieldName, fieldType in fields.items():
# Ensure numeric fields are properly typed
if fieldType in ("DOUBLE PRECISION", "INTEGER") and fieldName in record:
value = record[fieldName]
if value is not None:
try:
if fieldType == "DOUBLE PRECISION":
record[fieldName] = float(value)
elif fieldType == "INTEGER":
record[fieldName] = int(value)
except (ValueError, TypeError):
logger.warning(
f"Could not convert {fieldName} to {fieldType} for record {record.get('id', 'unknown')}: {value}"
)
elif fieldType == "JSONB" and fieldName in record:
if record[fieldName] is None:
# Generic type-based default: List types -> [], Dict types -> {}
# Interfaces handle domain-specific defaults
modelFields = modelClass.model_fields
fieldInfo = modelFields.get(fieldName)
if fieldInfo:
fieldAnnotation = fieldInfo.annotation
# Check if it's a List type
if (fieldAnnotation == list or
(hasattr(fieldAnnotation, "__origin__") and
fieldAnnotation.__origin__ is list)):
record[fieldName] = []
# Check if it's a Dict type
elif (fieldAnnotation == dict or
(hasattr(fieldAnnotation, "__origin__") and
fieldAnnotation.__origin__ is dict)):
record[fieldName] = {}
else:
record[fieldName] = None
else:
record[fieldName] = None
else:
try:
if isinstance(record[fieldName], str):
record[fieldName] = json.loads(record[fieldName])
elif isinstance(record[fieldName], (dict, list)):
pass
else:
record[fieldName] = json.loads(str(record[fieldName]))
except (json.JSONDecodeError, TypeError, ValueError):
logger.warning(
f"Could not parse JSONB field {fieldName}, keeping as string: {record[fieldName]}"
)
# Enrich records with row-level permissions if requested
if enrichPermissions:
records = _enrichRecordsWithPermissions(
records, permissions, currentUser
)
return records
except Exception as e:
logger.error(f"Error loading records with RBAC from table {table}: {e}")
return []
def buildRbacWhereClause(
permissions: UserPermissions,
currentUser: User,
table: str,
connector, # DatabaseConnector instance for connection access
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Build RBAC WHERE clause based on permissions and access level.
Multi-Tenant Design:
- mandateId wird explizit übergeben (aus Request-Context / X-Mandate-Id Header)
- featureInstanceId wird für Feature-Tabellen zusätzlich gefiltert
Args:
permissions: UserPermissions object
currentUser: User object
table: Table name
connector: DatabaseConnector instance (needed for GROUP queries)
mandateId: Explicit mandate context (from request header). Required for GROUP access.
featureInstanceId: Feature instance context for feature-level data isolation.
Returns:
Dictionary with "condition" and "values" keys, or None if no filtering needed
"""
if not permissions or not hasattr(permissions, "read"):
return None
readLevel = permissions.read
# No access - return empty result condition
if readLevel == AccessLevel.NONE:
return {"condition": "1 = 0", "values": []}
# CRITICAL: featureInstanceId filter is ALWAYS required when provided
# This ensures data isolation between feature instances regardless of access level
baseConditions = []
baseValues = []
if featureInstanceId:
# Strict filter: only records for this exact feature instance
baseConditions.append('"featureInstanceId" = %s')
baseValues.append(featureInstanceId)
# All records within the feature instance - only featureInstanceId filtering
if readLevel == AccessLevel.ALL:
if baseConditions:
return {"condition": " AND ".join(baseConditions), "values": baseValues}
return None
# My records - filter by _createdBy or userId field
if readLevel == AccessLevel.MY:
# Try common field names for creator
userIdField = None
if table == "UserInDB":
userIdField = "id"
elif table == "UserConnection":
userIdField = "userId"
else:
userIdField = "_createdBy"
conditions = list(baseConditions)
values = list(baseValues)
conditions.append(f'"{userIdField}" = %s')
values.append(currentUser.id)
return {
"condition": " AND ".join(conditions),
"values": values
}
# Group records - filter by mandateId or ownership based on namespace
if readLevel == AccessLevel.GROUP:
# Determine namespace for this table
namespace = TABLE_NAMESPACE.get(table, "system")
# For user-owned namespaces (chat, files, automation):
# GROUP has no meaning - these tables have no mandate context
# But still apply featureInstanceId filter if provided
if namespace in USER_OWNED_NAMESPACES:
if baseConditions:
return {"condition": " AND ".join(baseConditions), "values": baseValues}
return None
# For UAM and other namespaces: GROUP filters by mandate
effectiveMandateId = mandateId
if not effectiveMandateId:
# Fall back to Root mandate (first mandate in system) for GROUP access
# This allows system-level tables to be accessed without explicit mandate context
try:
from modules.datamodels.datamodelUam import Mandate
dbApp = getRootDbAppConnector()
allMandates = dbApp.getRecordset(Mandate)
if allMandates:
effectiveMandateId = allMandates[0].get("id")
except Exception as e:
logger.error(f"Error getting Root mandate: {e}")
if not effectiveMandateId:
logger.warning(f"User {currentUser.id} has no mandateId for GROUP access")
return {"condition": "1 = 0", "values": []}
# For UserInDB: Filter via UserMandate junction table
# Multi-Tenant Design: Users do NOT have mandateId - they are linked via UserMandate
if table == "UserInDB":
try:
with connector.connection.cursor() as cursor:
# Get all user IDs that are members of the current mandate
cursor.execute(
'SELECT "userId" FROM "UserMandate" WHERE "mandateId" = %s AND "enabled" = true',
(effectiveMandateId,)
)
userMandates = cursor.fetchall()
userIds = [um["userId"] for um in userMandates]
if not userIds:
return {"condition": "1 = 0", "values": []}
placeholders = ",".join(["%s"] * len(userIds))
# Combine with base conditions (featureInstanceId)
conditions = list(baseConditions)
values = list(baseValues)
conditions.append(f'"id" IN ({placeholders})')
values.extend(userIds)
return {
"condition": " AND ".join(conditions) if conditions else f'"id" IN ({placeholders})',
"values": values
}
except Exception as e:
logger.error(f"Error building GROUP filter for UserInDB via UserMandate: {e}")
return {"condition": "1 = 0", "values": []}
# For UserConnection: Filter via UserMandate junction table
elif table == "UserConnection":
try:
with connector.connection.cursor() as cursor:
# Get all user IDs that are members of the current mandate
cursor.execute(
'SELECT "userId" FROM "UserMandate" WHERE "mandateId" = %s AND "enabled" = true',
(effectiveMandateId,)
)
userMandates = cursor.fetchall()
userIds = [um["userId"] for um in userMandates]
if not userIds:
return {"condition": "1 = 0", "values": []}
placeholders = ",".join(["%s"] * len(userIds))
# Combine with base conditions (featureInstanceId)
conditions = list(baseConditions)
values = list(baseValues)
conditions.append(f'"userId" IN ({placeholders})')
values.extend(userIds)
return {
"condition": " AND ".join(conditions) if conditions else f'"userId" IN ({placeholders})',
"values": values
}
except Exception as e:
logger.error(f"Error building GROUP filter for UserConnection: {e}")
return {"condition": "1 = 0", "values": []}
# For system tables without mandateId column (Mandate, Role, etc.):
# No row-level filtering based on mandate, but still apply featureInstanceId if provided
elif table in ("Mandate", "Role"):
if baseConditions:
return {"condition": " AND ".join(baseConditions), "values": baseValues}
return None
# For other tables, filter by mandateId field
# Also include records with NULL mandateId for backwards compatibility
else:
# Start with base conditions (includes strict featureInstanceId filter)
conditions = list(baseConditions)
values = list(baseValues)
# Add mandate filter
conditions.append('("mandateId" = %s OR "mandateId" IS NULL)')
values.append(effectiveMandateId)
return {
"condition": " AND ".join(conditions),
"values": values
}
# Unknown access level - deny access (security: deny by default)
logger.warning(f"Unknown access level '{readLevel}' for user {currentUser.id} - denying access")
return {"condition": "1 = 0", "values": []}
def _enrichRecordsWithPermissions(
records: List[Dict[str, Any]],
permissions: UserPermissions,
currentUser: User
) -> List[Dict[str, Any]]:
"""
Enrich records with per-row permissions (_permissions field).
The _permissions field contains:
- canUpdate: bool - whether current user can update this record
- canDelete: bool - whether current user can delete this record
Logic:
- AccessLevel.ALL ('a'): User can update/delete all records
- AccessLevel.MY ('m'): User can only update/delete records where _createdBy == userId
- AccessLevel.GROUP ('g'): Same as MY for now (group-level ownership)
- AccessLevel.NONE ('n'): User cannot update/delete any records
Args:
records: List of record dicts
permissions: UserPermissions with update/delete levels
currentUser: Current user object
Returns:
Records with _permissions field added
"""
enriched = []
userId = currentUser.id if currentUser else None
for record in records:
recordCopy = dict(record)
createdBy = record.get("_createdBy")
# Determine canUpdate
canUpdate = _checkRowPermission(permissions.update, userId, createdBy)
# Determine canDelete
canDelete = _checkRowPermission(permissions.delete, userId, createdBy)
recordCopy["_permissions"] = {
"canUpdate": canUpdate,
"canDelete": canDelete
}
enriched.append(recordCopy)
return enriched
def _checkRowPermission(
accessLevel: Optional[AccessLevel],
userId: Optional[str],
recordCreatedBy: Optional[str]
) -> bool:
"""
Check if user has permission for a specific row based on access level.
Args:
accessLevel: The permission level (ALL, MY, GROUP, NONE)
userId: Current user's ID
recordCreatedBy: The _createdBy value of the record
Returns:
True if user has permission, False otherwise
"""
if not accessLevel or accessLevel == AccessLevel.NONE:
return False
if accessLevel == AccessLevel.ALL:
return True
# MY and GROUP: Check ownership via _createdBy
if accessLevel in (AccessLevel.MY, AccessLevel.GROUP):
# If record has no _createdBy, allow access (can't verify ownership)
if not recordCreatedBy:
return True
# If no userId, can't verify - deny
if not userId:
return False
# Check ownership
return recordCreatedBy == userId
# Unknown level - deny by default
return False