gateway/modules/datamodels/datamodelUam.py
2026-04-10 22:44:08 +02:00

424 lines
16 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
UAM models: User, Mandate, UserConnection.
Multi-Tenant Design:
- User gehört NICHT direkt zu einem Mandanten
- Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py)
- isSysAdmin ist globales Admin-Flag für System-Zugriff (KEIN Daten-Zugriff!)
"""
import uuid
from typing import Optional, List, Dict, Any
from enum import Enum
from pydantic import BaseModel, Field, EmailStr, field_validator, computed_field
from modules.datamodels.datamodelBase import PowerOnModel
from modules.shared.i18nRegistry import i18nModel
from modules.shared.timeUtils import getUtcTimestamp
class AuthAuthority(str, Enum):
LOCAL = "local"
GOOGLE = "google"
MSFT = "msft"
CLICKUP = "clickup"
class ConnectionStatus(str, Enum):
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
PENDING = "pending"
class AccessLevel(str, Enum):
"""Access level enumeration for RBAC"""
ALL = "a" # All records
MY = "m" # My records (created by me)
GROUP = "g" # Group records (group context is the mandate)
NONE = "n" # No access
class UserPermissions(BaseModel):
"""User permissions model for RBAC"""
view: bool = Field(
default=False,
description="View permission: if true, item is visible/enabled"
)
read: AccessLevel = Field(
default=AccessLevel.NONE,
description="Read permission level"
)
create: AccessLevel = Field(
default=AccessLevel.NONE,
description="Create permission level"
)
update: AccessLevel = Field(
default=AccessLevel.NONE,
description="Update permission level"
)
delete: AccessLevel = Field(
default=AccessLevel.NONE,
description="Delete permission level"
)
@i18nModel("Mandant")
class Mandate(PowerOnModel):
"""
Mandate (Mandant/Tenant) model.
Ein Mandant ist ein isolierter Bereich für Daten und Berechtigungen.
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False, "label": "ID"},
)
name: str = Field(
description="Name of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": True, "label": "Name"},
)
label: Optional[str] = Field(
default=None,
description="Display label of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Label"},
)
enabled: bool = Field(
default=True,
description="Indicates whether the mandate is enabled",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "Aktiviert"},
)
isSystem: bool = Field(
default=False,
description="Whether this is a system mandate (e.g. root mandate). Cannot be deleted.",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": True, "frontend_required": False, "label": "System-Mandant"},
)
deletedAt: Optional[float] = Field(
default=None,
description="Timestamp when the mandate was soft-deleted. After 30 days, hard-delete is triggered.",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Gelöscht am"},
)
@field_validator('isSystem', mode='before')
@classmethod
def _coerceIsSystem(cls, v):
"""Coerce None to False (for existing DB records without isSystem field)."""
if v is None:
return False
return v
@i18nModel("Benutzerverbindung")
class UserConnection(PowerOnModel):
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the connection",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "ID"},
)
userId: str = Field(
description="ID of the user this connection belongs to",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "Benutzer-ID"},
)
authority: AuthAuthority = Field(
description="Authentication authority",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": "/api/connections/authorities/options",
"label": "Autorität",
},
)
externalId: str = Field(
description="User ID in the external system",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "Externe ID"},
)
externalUsername: str = Field(
description="Username in the external system",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Externer Benutzername"},
)
externalEmail: Optional[EmailStr] = Field(
None,
description="Email in the external system",
json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": False, "label": "Externe E-Mail"},
)
status: ConnectionStatus = Field(
default=ConnectionStatus.ACTIVE,
description="Connection status",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": False,
"frontend_required": False,
"frontend_options": "/api/connections/statuses/options",
"label": "Status",
},
)
connectedAt: float = Field(
default_factory=getUtcTimestamp,
description="When the connection was established (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Verbunden am"},
)
lastChecked: float = Field(
default_factory=getUtcTimestamp,
description="When the connection was last verified (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Zuletzt geprüft"},
)
expiresAt: Optional[float] = Field(
None,
description="When the connection expires (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Läuft ab am"},
)
tokenStatus: Optional[str] = Field(
None,
description="Current token status: active, expired, none",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": [
{"value": "active", "label": "Active"},
{"value": "expired", "label": "Expired"},
{"value": "none", "label": "None"},
],
"label": "Verbindungsstatus",
},
)
tokenExpiresAt: Optional[float] = Field(
None,
description="When the current token expires (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Token läuft ab am"},
)
grantedScopes: Optional[List[str]] = Field(
None,
description="OAuth scopes granted for this connection",
json_schema_extra={"frontend_type": "list", "frontend_readonly": True, "frontend_required": False, "label": "Gewährte Berechtigungen"},
)
@computed_field
@computed_field
@property
def connectionReference(self) -> str:
"""Generate connection reference string in format: connection:{authority}:{username}"""
return f"connection:{self.authority.value}:{self.externalUsername}"
@computed_field
@property
def displayLabel(self) -> str:
"""Human-readable label for display in dropdowns"""
authorityLabels = {
"msft": "Microsoft",
"google": "Google",
"local": "Local",
"clickup": "ClickUp",
}
return f"{authorityLabels.get(self.authority.value, self.authority.value)}: {self.externalUsername}"
@i18nModel("Benutzer")
class User(PowerOnModel):
"""
User model.
Multi-Tenant Design:
- User gehört NICHT direkt zu einem Mandanten
- Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py)
- Rollen werden über UserMandateRole gesteuert
- isSysAdmin = System-Zugriff, KEIN Daten-Zugriff
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the user",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False, "label": "ID"},
)
username: str = Field(
description="Username for login (immutable after creation)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True, "label": "Benutzername"},
)
email: Optional[EmailStr] = Field(
default=None,
description="Email address of the user",
json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": True, "label": "E-Mail"},
)
fullName: Optional[str] = Field(
default=None,
description="Full name of the user",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Vollständiger Name"},
)
language: str = Field(
default="de",
description="Preferred language of the user (ISO 639-1 code: de, en, fr, it)",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": False,
"frontend_required": True,
"frontend_options": [
{"value": "de", "label": "Deutsch"},
{"value": "en", "label": "Englisch"},
{"value": "fr", "label": "Französisch"},
{"value": "it", "label": "Italienisch"},
],
"label": "Sprache",
},
)
@field_validator('language', mode='before')
@classmethod
def _normalizeLanguage(cls, v):
"""Normalize language to valid ISO 639-1 code."""
if v is None:
return "de"
# Map common variations to standard codes
langMap = {
'english': 'en', 'englisch': 'en',
'german': 'de', 'deutsch': 'de',
'french': 'fr', 'französisch': 'fr', 'francais': 'fr',
'italian': 'it', 'italienisch': 'it', 'italiano': 'it',
}
normalized = str(v).lower().strip()
if normalized in langMap:
return langMap[normalized]
# If already a valid code, return as-is
if normalized in ['de', 'en', 'fr', 'it']:
return normalized
# Default fallback
return "de"
enabled: bool = Field(
default=True,
description="Indicates whether the user is enabled",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "Aktiviert"},
)
isSysAdmin: bool = Field(
default=False,
description="Global SysAdmin flag. SysAdmin = System-Zugriff, KEIN Daten-Zugriff!",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "System-Admin"},
)
@field_validator('isSysAdmin', mode='before')
@classmethod
def _coerceIsSysAdmin(cls, v):
"""Konvertiert None zu False (für bestehende DB-Einträge ohne isSysAdmin Feld)."""
if v is None:
return False
return v
authenticationAuthority: AuthAuthority = Field(
default=AuthAuthority.LOCAL,
description="Primary authentication authority",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": "/api/connections/authorities/options",
"label": "Authentifizierung",
},
)
roleLabels: List[str] = Field(
default_factory=list,
description="Role labels (from DB or enriched when loading users)",
json_schema_extra={
"frontend_type": "multiselect",
"frontend_readonly": True,
"frontend_visible": False,
"frontend_required": False,
"label": "Rollen-Labels",
},
)
@i18nModel("Benutzerzugang")
class UserInDB(User):
"""User model with password hash for database storage."""
hashedPassword: Optional[str] = Field(
None,
description="Hash of the user password",
json_schema_extra={"label": "Passwort-Hash"},
)
resetToken: Optional[str] = Field(
None,
description="Password reset token (UUID)",
json_schema_extra={"label": "Reset-Token"},
)
resetTokenExpires: Optional[float] = Field(
None,
description="Reset token expiration (UTC timestamp in seconds)",
json_schema_extra={"label": "Token läuft ab"},
)
def _normalizeTtsVoiceMap(value: Any) -> Optional[Dict[str, str]]:
"""
Coerce ttsVoiceMap payloads to Dict[str, str].
UI/clients may send per-locale objects like {"voiceName": "de-DE-Chirp3-HD-Achird"};
storage and model field type are locale -> voice id string.
"""
if value is None:
return None
if not isinstance(value, dict):
return None
out: Dict[str, str] = {}
for rawKey, rawVal in value.items():
key = str(rawKey)
if rawVal is None:
continue
if isinstance(rawVal, str):
out[key] = rawVal
elif isinstance(rawVal, dict):
vn = rawVal.get("voiceName")
if vn is not None and str(vn).strip() != "":
out[key] = str(vn).strip()
else:
out[key] = str(rawVal)
return out if out else None
@i18nModel("Spracheinstellungen")
class UserVoicePreferences(PowerOnModel):
"""User-level voice/language preferences, shared across all features."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Primary key",
json_schema_extra={"label": "ID"},
)
userId: str = Field(description="User ID", json_schema_extra={"label": "Benutzer-ID"})
mandateId: Optional[str] = Field(
default=None,
description="Mandate scope (None = global for user)",
json_schema_extra={"label": "Mandanten-ID"},
)
sttLanguage: str = Field(
default="de-DE",
description="Speech-to-text language code",
json_schema_extra={"label": "STT-Sprache"},
)
ttsLanguage: str = Field(
default="de-DE",
description="Text-to-speech language code",
json_schema_extra={"label": "TTS-Sprache"},
)
ttsVoice: Optional[str] = Field(
default=None,
description="Preferred TTS voice identifier",
json_schema_extra={"label": "TTS-Stimme"},
)
ttsVoiceMap: Optional[Dict[str, str]] = Field(
default=None,
description="Language-to-voice mapping",
json_schema_extra={"label": "Stimmen-Zuordnung"},
)
translationSourceLanguage: Optional[str] = Field(
default=None,
description="Source language for translations",
json_schema_extra={"label": "Übersetzung Quelle"},
)
translationTargetLanguage: Optional[str] = Field(
default=None,
description="Target language for translations",
json_schema_extra={"label": "Übersetzung Ziel"},
)
@field_validator("ttsVoiceMap", mode="before")
@classmethod
def _validateTtsVoiceMap(cls, value: Any) -> Optional[Dict[str, str]]:
return _normalizeTtsVoiceMap(value)