gateway/modules/datamodels/datamodelUam.py
2026-04-17 13:48:18 +02:00

413 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
UAM models: User, Mandate, UserConnection.
Multi-Tenant Design:
- User gehört NICHT direkt zu einem Mandanten
- Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py)
- isSysAdmin ist globales Admin-Flag für System-Zugriff (KEIN Daten-Zugriff!)
"""
import uuid
from typing import Optional, List, Dict, Any
from enum import Enum
from pydantic import BaseModel, Field, EmailStr, field_validator, computed_field
from modules.datamodels.datamodelBase import PowerOnModel
from modules.shared.i18nRegistry import i18nModel, normalizePrimaryLanguageTag
from modules.shared.timeUtils import getUtcTimestamp
class AuthAuthority(str, Enum):
LOCAL = "local"
GOOGLE = "google"
MSFT = "msft"
CLICKUP = "clickup"
class ConnectionStatus(str, Enum):
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
PENDING = "pending"
class AccessLevel(str, Enum):
"""Access level enumeration for RBAC"""
ALL = "a" # All records
MY = "m" # My records (created by me)
GROUP = "g" # Group records (group context is the mandate)
NONE = "n" # No access
class UserPermissions(BaseModel):
"""User permissions model for RBAC"""
view: bool = Field(
default=False,
description="View permission: if true, item is visible/enabled"
)
read: AccessLevel = Field(
default=AccessLevel.NONE,
description="Read permission level"
)
create: AccessLevel = Field(
default=AccessLevel.NONE,
description="Create permission level"
)
update: AccessLevel = Field(
default=AccessLevel.NONE,
description="Update permission level"
)
delete: AccessLevel = Field(
default=AccessLevel.NONE,
description="Delete permission level"
)
@i18nModel("Mandant")
class Mandate(PowerOnModel):
"""
Mandate (Mandant/Tenant) model.
Ein Mandant ist ein isolierter Bereich für Daten und Berechtigungen.
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False, "label": "ID"},
)
name: str = Field(
description="Name of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": True, "label": "Name"},
)
label: Optional[str] = Field(
default=None,
description="Display label of the mandate",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Label"},
)
enabled: bool = Field(
default=True,
description="Indicates whether the mandate is enabled",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "Aktiviert"},
)
isSystem: bool = Field(
default=False,
description="Whether this is a system mandate (e.g. root mandate). Cannot be deleted.",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": True, "frontend_required": False, "label": "System-Mandant"},
)
deletedAt: Optional[float] = Field(
default=None,
description="Timestamp when the mandate was soft-deleted. After 30 days, hard-delete is triggered.",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Gelöscht am"},
)
@field_validator('isSystem', mode='before')
@classmethod
def _coerceIsSystem(cls, v):
"""Coerce None to False (for existing DB records without isSystem field)."""
if v is None:
return False
return v
@i18nModel("Benutzerverbindung")
class UserConnection(PowerOnModel):
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the connection",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "ID"},
)
userId: str = Field(
description="ID of the user this connection belongs to",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "Benutzer-ID"},
)
authority: AuthAuthority = Field(
description="Authentication authority",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": "/api/connections/authorities/options",
"label": "Autorität",
},
)
externalId: str = Field(
description="User ID in the external system",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "label": "Externe ID"},
)
externalUsername: str = Field(
description="Username in the external system",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Externer Benutzername"},
)
externalEmail: Optional[EmailStr] = Field(
None,
description="Email in the external system",
json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": False, "label": "Externe E-Mail"},
)
status: ConnectionStatus = Field(
default=ConnectionStatus.ACTIVE,
description="Connection status",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": False,
"frontend_required": False,
"frontend_options": "/api/connections/statuses/options",
"label": "Status",
},
)
connectedAt: float = Field(
default_factory=getUtcTimestamp,
description="When the connection was established (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Verbunden am"},
)
lastChecked: float = Field(
default_factory=getUtcTimestamp,
description="When the connection was last verified (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Zuletzt geprüft"},
)
expiresAt: Optional[float] = Field(
None,
description="When the connection expires (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Läuft ab am"},
)
tokenStatus: Optional[str] = Field(
None,
description="Current token status: active, expired, none",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": [
{"value": "active", "label": "Active"},
{"value": "expired", "label": "Expired"},
{"value": "none", "label": "None"},
],
"label": "Verbindungsstatus",
},
)
tokenExpiresAt: Optional[float] = Field(
None,
description="When the current token expires (UTC timestamp in seconds)",
json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False, "label": "Token läuft ab am"},
)
grantedScopes: Optional[List[str]] = Field(
None,
description="OAuth scopes granted for this connection",
json_schema_extra={"frontend_type": "list", "frontend_readonly": True, "frontend_required": False, "label": "Gewährte Berechtigungen"},
)
@computed_field
@property
def connectionReference(self) -> str:
"""Generate connection reference string in format: connection:{authority}:{username}"""
return f"connection:{self.authority.value}:{self.externalUsername}"
@computed_field
@property
def displayLabel(self) -> str:
"""Human-readable label for display in dropdowns"""
authorityLabels = {
"msft": "Microsoft",
"google": "Google",
"local": "Local",
"clickup": "ClickUp",
}
return f"{authorityLabels.get(self.authority.value, self.authority.value)}: {self.externalUsername}"
@i18nModel("Benutzer")
class User(PowerOnModel):
"""
User model.
Multi-Tenant Design:
- User gehört NICHT direkt zu einem Mandanten
- Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py)
- Rollen werden über UserMandateRole gesteuert
- isSysAdmin = System-Zugriff, KEIN Daten-Zugriff
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the user",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False, "label": "ID"},
)
username: str = Field(
description="Username for login (immutable after creation)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True, "label": "Benutzername"},
)
email: Optional[EmailStr] = Field(
default=None,
description="Email address of the user",
json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": True, "label": "E-Mail"},
)
fullName: Optional[str] = Field(
default=None,
description="Full name of the user",
json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False, "label": "Vollständiger Name"},
)
language: str = Field(
default="de",
description="Preferred UI language code (must exist as UiLanguageSet).",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": False,
"frontend_required": True,
"frontend_options": "/api/i18n/codes",
"label": "Sprache",
},
)
@field_validator('language', mode='before')
@classmethod
def _normalizeLanguage(cls, v):
"""Normalize to primary language subtag (28 letters); default remains ``de``."""
if v is None:
return "de"
langMap = {
'english': 'en', 'englisch': 'en',
'german': 'de', 'deutsch': 'de',
'french': 'fr', 'französisch': 'fr', 'francais': 'fr',
'italian': 'it', 'italienisch': 'it', 'italiano': 'it',
}
normalized = str(v).lower().strip()
if normalized in langMap:
return langMap[normalized]
return normalizePrimaryLanguageTag(normalized, "de")
enabled: bool = Field(
default=True,
description="Indicates whether the user is enabled",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "Aktiviert"},
)
isSysAdmin: bool = Field(
default=False,
description="Global SysAdmin flag. SysAdmin = System-Zugriff, KEIN Daten-Zugriff!",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False, "label": "System-Admin"},
)
@field_validator('isSysAdmin', mode='before')
@classmethod
def _coerceIsSysAdmin(cls, v):
"""Konvertiert None zu False (für bestehende DB-Einträge ohne isSysAdmin Feld)."""
if v is None:
return False
return v
authenticationAuthority: AuthAuthority = Field(
default=AuthAuthority.LOCAL,
description="Primary authentication authority",
json_schema_extra={
"frontend_type": "select",
"frontend_readonly": True,
"frontend_required": False,
"frontend_options": "/api/connections/authorities/options",
"label": "Authentifizierung",
},
)
roleLabels: List[str] = Field(
default_factory=list,
description="Role labels (from DB or enriched when loading users)",
json_schema_extra={
"frontend_type": "multiselect",
"frontend_readonly": True,
"frontend_visible": False,
"frontend_required": False,
"label": "Rollen-Labels",
},
)
@i18nModel("Benutzerzugang")
class UserInDB(User):
"""User model with password hash for database storage."""
hashedPassword: Optional[str] = Field(
None,
description="Hash of the user password",
json_schema_extra={"label": "Passwort-Hash"},
)
resetToken: Optional[str] = Field(
None,
description="Password reset token (UUID)",
json_schema_extra={"label": "Reset-Token"},
)
resetTokenExpires: Optional[float] = Field(
None,
description="Reset token expiration (UTC timestamp in seconds)",
json_schema_extra={"label": "Token läuft ab"},
)
def _normalizeTtsVoiceMap(value: Any) -> Optional[Dict[str, str]]:
"""
Coerce ttsVoiceMap payloads to Dict[str, str].
UI/clients may send per-locale objects like {"voiceName": "de-DE-Chirp3-HD-Achird"};
storage and model field type are locale -> voice id string.
"""
if value is None:
return None
if not isinstance(value, dict):
return None
out: Dict[str, str] = {}
for rawKey, rawVal in value.items():
key = str(rawKey)
if rawVal is None:
continue
if isinstance(rawVal, str):
out[key] = rawVal
elif isinstance(rawVal, dict):
vn = rawVal.get("voiceName")
if vn is not None and str(vn).strip() != "":
out[key] = str(vn).strip()
else:
out[key] = str(rawVal)
return out if out else None
@i18nModel("Spracheinstellungen")
class UserVoicePreferences(PowerOnModel):
"""User-level voice/language preferences, shared across all features."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Primary key",
json_schema_extra={"label": "ID"},
)
userId: str = Field(description="User ID", json_schema_extra={"label": "Benutzer-ID"})
mandateId: Optional[str] = Field(
default=None,
description="Mandate scope (None = global for user)",
json_schema_extra={"label": "Mandanten-ID"},
)
sttLanguage: str = Field(
default="de-DE",
description="Speech-to-text language code",
json_schema_extra={"label": "STT-Sprache"},
)
ttsLanguage: str = Field(
default="de-DE",
description="Text-to-speech language code",
json_schema_extra={"label": "TTS-Sprache"},
)
ttsVoice: Optional[str] = Field(
default=None,
description="Preferred TTS voice identifier",
json_schema_extra={"label": "TTS-Stimme"},
)
ttsVoiceMap: Optional[Dict[str, str]] = Field(
default=None,
description="Language-to-voice mapping",
json_schema_extra={"label": "Stimmen-Zuordnung"},
)
translationSourceLanguage: Optional[str] = Field(
default=None,
description="Source language for translations",
json_schema_extra={"label": "Übersetzung Quelle"},
)
translationTargetLanguage: Optional[str] = Field(
default=None,
description="Target language for translations",
json_schema_extra={"label": "Übersetzung Ziel"},
)
@field_validator("ttsVoiceMap", mode="before")
@classmethod
def _validateTtsVoiceMap(cls, value: Any) -> Optional[Dict[str, str]]:
return _normalizeTtsVoiceMap(value)