# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ UAM models: User, Mandate, UserConnection. Multi-Tenant Design: - User gehört NICHT direkt zu einem Mandanten - Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py) - isSysAdmin ist globales Admin-Flag für System-Zugriff (KEIN Daten-Zugriff!) """ import uuid from typing import Optional, List from enum import Enum from pydantic import BaseModel, Field, EmailStr, field_validator, computed_field from modules.shared.attributeUtils import registerModelLabels from modules.shared.timeUtils import getUtcTimestamp class AuthAuthority(str, Enum): LOCAL = "local" GOOGLE = "google" MSFT = "msft" class ConnectionStatus(str, Enum): ACTIVE = "active" EXPIRED = "expired" REVOKED = "revoked" PENDING = "pending" class AccessLevel(str, Enum): """Access level enumeration for RBAC""" ALL = "a" # All records MY = "m" # My records (created by me) GROUP = "g" # Group records (group context is the mandate) NONE = "n" # No access class UserPermissions(BaseModel): """User permissions model for RBAC""" view: bool = Field( default=False, description="View permission: if true, item is visible/enabled" ) read: AccessLevel = Field( default=AccessLevel.NONE, description="Read permission level" ) create: AccessLevel = Field( default=AccessLevel.NONE, description="Create permission level" ) update: AccessLevel = Field( default=AccessLevel.NONE, description="Update permission level" ) delete: AccessLevel = Field( default=AccessLevel.NONE, description="Delete permission level" ) class Mandate(BaseModel): """ Mandate (Mandant/Tenant) model. Ein Mandant ist ein isolierter Bereich für Daten und Berechtigungen. """ id: str = Field( default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the mandate", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False} ) name: str = Field( description="Name of the mandate", json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": True} ) label: Optional[str] = Field( default=None, description="Display label of the mandate", json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False} ) enabled: bool = Field( default=True, description="Indicates whether the mandate is enabled", json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False} ) isSystem: bool = Field( default=False, description="Whether this is a system mandate (e.g. root mandate). Cannot be deleted.", json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": True, "frontend_required": False} ) @field_validator('isSystem', mode='before') @classmethod def _coerceIsSystem(cls, v): """Coerce None to False (for existing DB records without isSystem field).""" if v is None: return False return v registerModelLabels( "Mandate", {"en": "Mandate", "de": "Mandant", "fr": "Mandat"}, { "id": {"en": "ID", "de": "ID", "fr": "ID"}, "name": {"en": "Name", "de": "Name", "fr": "Nom"}, "label": {"en": "Label", "de": "Label", "fr": "Libellé"}, "enabled": {"en": "Enabled", "de": "Aktiviert", "fr": "Activé"}, "isSystem": {"en": "System Mandate", "de": "System-Mandant", "fr": "Mandat système"}, }, ) class UserConnection(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the connection", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) userId: str = Field(description="ID of the user this connection belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) authority: AuthAuthority = Field(description="Authentication authority", json_schema_extra={"frontend_type": "select", "frontend_readonly": True, "frontend_required": False, "frontend_options": "/api/connections/authorities/options"}) externalId: str = Field(description="User ID in the external system", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) externalUsername: str = Field(description="Username in the external system", json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False}) externalEmail: Optional[EmailStr] = Field(None, description="Email in the external system", json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": False}) status: ConnectionStatus = Field(default=ConnectionStatus.ACTIVE, description="Connection status", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": "/api/connections/statuses/options"}) connectedAt: float = Field(default_factory=getUtcTimestamp, description="When the connection was established (UTC timestamp in seconds)", json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False}) lastChecked: float = Field(default_factory=getUtcTimestamp, description="When the connection was last verified (UTC timestamp in seconds)", json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False}) expiresAt: Optional[float] = Field(None, description="When the connection expires (UTC timestamp in seconds)", json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False}) tokenStatus: Optional[str] = Field(None, description="Current token status: active, expired, none", json_schema_extra={"frontend_type": "select", "frontend_readonly": True, "frontend_required": False, "frontend_options": [ {"value": "active", "label": {"en": "Active", "fr": "Actif"}}, {"value": "expired", "label": {"en": "Expired", "fr": "Expiré"}}, {"value": "none", "label": {"en": "None", "fr": "Aucun"}}, ]}) tokenExpiresAt: Optional[float] = Field(None, description="When the current token expires (UTC timestamp in seconds)", json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False}) grantedScopes: Optional[List[str]] = Field(None, description="OAuth scopes granted for this connection", json_schema_extra={"frontend_type": "list", "frontend_readonly": True, "frontend_required": False}) @computed_field @computed_field @property def connectionReference(self) -> str: """Generate connection reference string in format: connection:{authority}:{username}""" return f"connection:{self.authority.value}:{self.externalUsername}" @computed_field @property def displayLabel(self) -> str: """Human-readable label for display in dropdowns""" authorityLabels = {"msft": "Microsoft", "google": "Google", "local": "Local"} return f"{authorityLabels.get(self.authority.value, self.authority.value)}: {self.externalUsername}" registerModelLabels( "UserConnection", {"en": "User Connection", "de": "Benutzerverbindung", "fr": "Connexion utilisateur"}, { "id": {"en": "ID", "de": "ID", "fr": "ID"}, "userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"}, "authority": {"en": "Authority", "de": "Autorität", "fr": "Autorité"}, "externalId": {"en": "External ID", "de": "Externe ID", "fr": "ID externe"}, "externalUsername": {"en": "External Username", "de": "Externer Benutzername", "fr": "Nom d'utilisateur externe"}, "externalEmail": {"en": "External Email", "de": "Externe E-Mail", "fr": "Email externe"}, "status": {"en": "Status", "de": "Status", "fr": "Statut"}, "connectedAt": {"en": "Connected At", "de": "Verbunden am", "fr": "Connecté le"}, "lastChecked": {"en": "Last Checked", "de": "Zuletzt geprüft", "fr": "Dernière vérification"}, "expiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"}, "tokenStatus": {"en": "Connection Status", "de": "Verbindungsstatus", "fr": "Statut de connexion"}, "tokenExpiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"}, "grantedScopes": {"en": "Granted Scopes", "de": "Gewährte Berechtigungen", "fr": "Autorisations accordées"}, "connectionReference": {"en": "Connection Reference", "de": "Verbindungsreferenz", "fr": "Référence de connexion"}, "displayLabel": {"en": "Display Label", "de": "Anzeigebezeichnung", "fr": "Libellé d'affichage"}, }, ) class User(BaseModel): """ User model. Multi-Tenant Design: - User gehört NICHT direkt zu einem Mandanten - Zugehörigkeit wird über UserMandate gesteuert (siehe datamodelMembership.py) - Rollen werden über UserMandateRole gesteuert - isSysAdmin = System-Zugriff, KEIN Daten-Zugriff """ id: str = Field( default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the user", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_visible": False, "frontend_required": False} ) username: str = Field( description="Username for login (immutable after creation)", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True} ) email: Optional[EmailStr] = Field( default=None, description="Email address of the user", json_schema_extra={"frontend_type": "email", "frontend_readonly": False, "frontend_required": True} ) fullName: Optional[str] = Field( default=None, description="Full name of the user", json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": False} ) language: str = Field( default="de", description="Preferred language of the user (ISO 639-1 code: de, en, fr, it)", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": True, "frontend_options": [ {"value": "de", "label": {"en": "Deutsch", "de": "Deutsch", "fr": "Allemand"}}, {"value": "en", "label": {"en": "English", "de": "Englisch", "fr": "Anglais"}}, {"value": "fr", "label": {"en": "Français", "de": "Französisch", "fr": "Français"}}, {"value": "it", "label": {"en": "Italiano", "de": "Italienisch", "fr": "Italien"}}, ]} ) @field_validator('language', mode='before') @classmethod def _normalizeLanguage(cls, v): """Normalize language to valid ISO 639-1 code.""" if v is None: return "de" # Map common variations to standard codes langMap = { 'english': 'en', 'englisch': 'en', 'german': 'de', 'deutsch': 'de', 'french': 'fr', 'französisch': 'fr', 'francais': 'fr', 'italian': 'it', 'italienisch': 'it', 'italiano': 'it', } normalized = str(v).lower().strip() if normalized in langMap: return langMap[normalized] # If already a valid code, return as-is if normalized in ['de', 'en', 'fr', 'it']: return normalized # Default fallback return "de" enabled: bool = Field( default=True, description="Indicates whether the user is enabled", json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False} ) isSysAdmin: bool = Field( default=False, description="Global SysAdmin flag. SysAdmin = System-Zugriff, KEIN Daten-Zugriff!", json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False} ) @field_validator('isSysAdmin', mode='before') @classmethod def _coerceIsSysAdmin(cls, v): """Konvertiert None zu False (für bestehende DB-Einträge ohne isSysAdmin Feld).""" if v is None: return False return v authenticationAuthority: AuthAuthority = Field( default=AuthAuthority.LOCAL, description="Primary authentication authority", json_schema_extra={"frontend_type": "select", "frontend_readonly": True, "frontend_required": False, "frontend_options": "/api/connections/authorities/options"} ) registerModelLabels( "User", {"en": "User", "de": "Benutzer", "fr": "Utilisateur"}, { "id": {"en": "ID", "de": "ID", "fr": "ID"}, "username": {"en": "Username", "de": "Benutzername", "fr": "Nom d'utilisateur"}, "email": {"en": "Email", "de": "E-Mail", "fr": "Email"}, "fullName": {"en": "Full Name", "de": "Vollständiger Name", "fr": "Nom complet"}, "language": {"en": "Language", "de": "Sprache", "fr": "Langue"}, "enabled": {"en": "Enabled", "de": "Aktiviert", "fr": "Activé"}, "isSysAdmin": {"en": "System Admin", "de": "System-Admin", "fr": "Admin système"}, "authenticationAuthority": {"en": "Auth Authority", "de": "Authentifizierung", "fr": "Autorité d'authentification"}, }, ) class UserInDB(User): """User model with password hash for database storage.""" hashedPassword: Optional[str] = Field(None, description="Hash of the user password") resetToken: Optional[str] = Field(None, description="Password reset token (UUID)") resetTokenExpires: Optional[float] = Field(None, description="Reset token expiration (UTC timestamp in seconds)") registerModelLabels( "UserInDB", {"en": "User Access", "de": "Benutzerzugang", "fr": "Accès de l'utilisateur"}, { "hashedPassword": {"en": "Password hash", "de": "Passwort-Hash", "fr": "Hachage de mot de passe"}, "resetToken": {"en": "Reset Token", "de": "Reset-Token", "fr": "Jeton de réinitialisation"}, "resetTokenExpires": {"en": "Reset Token Expires", "de": "Token läuft ab", "fr": "Expiration du jeton"}, }, )