# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Security models: Token and AuthEvent. Multi-Tenant Design: - Token ist NICHT an einen Mandanten gebunden - User arbeitet parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs) - Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt """ from typing import Optional, Any from pydantic import BaseModel, Field, ConfigDict, model_validator from modules.shared.attributeUtils import registerModelLabels from modules.shared.timeUtils import getUtcTimestamp from .datamodelUam import AuthAuthority from enum import Enum import uuid class TokenStatus(str, Enum): ACTIVE = "active" REVOKED = "revoked" class TokenPurpose(str, Enum): """Login/session token vs provider token bound to a UserConnection.""" AUTH_SESSION = "authSession" DATA_CONNECTION = "dataConnection" class Token(BaseModel): """ Authentication Token model. Multi-Tenant Design: - Token ist User-gebunden, NICHT Mandant-gebunden - Ermöglicht parallele Arbeit in mehreren Mandanten - Mandant-Kontext wird per Request-Header bestimmt """ id: Optional[str] = None userId: str authority: AuthAuthority connectionId: Optional[str] = Field( None, description="ID of the connection this token belongs to" ) tokenPurpose: Optional[TokenPurpose] = Field( default=None, description="authSession = gateway login JWT; dataConnection = provider OAuth for a connection", ) tokenAccess: str tokenType: str = "bearer" expiresAt: float = Field( description="When the token expires (UTC timestamp in seconds)" ) tokenRefresh: Optional[str] = None createdAt: Optional[float] = Field( None, description="When the token was created (UTC timestamp in seconds)" ) status: TokenStatus = Field( default=TokenStatus.ACTIVE, description="Token status: active/revoked" ) revokedAt: Optional[float] = Field( None, description="When the token was revoked (UTC timestamp in seconds)" ) revokedBy: Optional[str] = Field( None, description="User ID who revoked the token (admin/self)" ) reason: Optional[str] = Field(None, description="Optional revocation reason") sessionId: Optional[str] = Field( None, description="Logical session grouping for logout revocation" ) # ENTFERNT: mandateId - Token ist nicht mehr Mandant-spezifisch # Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt model_config = ConfigDict(use_enum_values=True) @model_validator(mode="before") @classmethod def _defaultTokenPurposeFromDb(cls, data: Any) -> Any: """Missing tokenPurpose: connection rows → dataConnection; session rows → authSession.""" if isinstance(data, dict): tp = data.get("tokenPurpose") if tp is None or tp == "": cid = data.get("connectionId") purpose = ( TokenPurpose.DATA_CONNECTION.value if cid else TokenPurpose.AUTH_SESSION.value ) data = {**data, "tokenPurpose": purpose} return data registerModelLabels( "Token", {"en": "Token", "de": "Token", "fr": "Jeton"}, { "id": {"en": "ID", "de": "ID", "fr": "ID"}, "userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"}, "authority": {"en": "Authority", "de": "Autorität", "fr": "Autorité"}, "connectionId": {"en": "Connection ID", "de": "Verbindungs-ID", "fr": "ID de connexion"}, "tokenPurpose": {"en": "Token purpose", "de": "Token-Verwendung", "fr": "Usage du jeton"}, "tokenAccess": {"en": "Access Token", "de": "Zugriffstoken", "fr": "Jeton d'accès"}, "tokenType": {"en": "Token Type", "de": "Token-Typ", "fr": "Type de jeton"}, "expiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"}, "tokenRefresh": {"en": "Refresh Token", "de": "Refresh-Token", "fr": "Jeton de rafraîchissement"}, "createdAt": {"en": "Created At", "de": "Erstellt am", "fr": "Créé le"}, "status": {"en": "Status", "de": "Status", "fr": "Statut"}, "revokedAt": {"en": "Revoked At", "de": "Widerrufen am", "fr": "Révoqué le"}, "revokedBy": {"en": "Revoked By", "de": "Widerrufen von", "fr": "Révoqué par"}, "reason": {"en": "Reason", "de": "Grund", "fr": "Raison"}, "sessionId": {"en": "Session ID", "de": "Sitzungs-ID", "fr": "ID de session"}, }, ) class AuthEvent(BaseModel): """Authentication event for audit logging.""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the auth event", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) userId: str = Field(description="ID of the user this event belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}) eventType: str = Field(description="Type of authentication event (e.g., 'login', 'logout', 'token_refresh')", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}) timestamp: float = Field(default_factory=getUtcTimestamp, description="Unix timestamp when the event occurred", json_schema_extra={"frontend_type": "datetime", "frontend_readonly": True, "frontend_required": True}) ipAddress: Optional[str] = Field(default=None, description="IP address from which the event originated", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) userAgent: Optional[str] = Field(default=None, description="User agent string from the request", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) success: bool = Field(default=True, description="Whether the authentication event was successful", json_schema_extra={"frontend_type": "boolean", "frontend_readonly": True, "frontend_required": True}) details: Optional[str] = Field(default=None, description="Additional details about the event", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) registerModelLabels( "AuthEvent", {"en": "Authentication Event", "de": "Authentifizierungsereignis", "fr": "Événement d'authentification"}, { "id": {"en": "ID", "de": "ID", "fr": "ID"}, "userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"}, "eventType": {"en": "Event Type", "de": "Ereignistyp", "fr": "Type d'événement"}, "timestamp": {"en": "Timestamp", "de": "Zeitstempel", "fr": "Horodatage"}, "ipAddress": {"en": "IP Address", "de": "IP-Adresse", "fr": "Adresse IP"}, "userAgent": {"en": "User Agent", "de": "User-Agent", "fr": "Agent utilisateur"}, "success": {"en": "Success", "de": "Erfolgreich", "fr": "Succès"}, "details": {"en": "Details", "de": "Details", "fr": "Détails"}, }, )