# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Security models: Token and AuthEvent. Multi-Tenant Design: - Token ist NICHT an einen Mandanten gebunden - User arbeitet parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs) - Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt """ from typing import Optional, Any from pydantic import BaseModel, Field, ConfigDict, model_validator from modules.datamodels.datamodelBase import PowerOnModel from modules.shared.i18nRegistry import i18nModel from modules.shared.timeUtils import getUtcTimestamp from .datamodelUam import AuthAuthority from enum import Enum import uuid class TokenStatus(str, Enum): ACTIVE = "active" REVOKED = "revoked" class TokenPurpose(str, Enum): """Login/session token vs provider token bound to a UserConnection.""" AUTH_SESSION = "authSession" DATA_CONNECTION = "dataConnection" @i18nModel("Token") class Token(PowerOnModel): """ Authentication Token model. Multi-Tenant Design: - Token ist User-gebunden, NICHT Mandant-gebunden - Ermöglicht parallele Arbeit in mehreren Mandanten - Mandant-Kontext wird per Request-Header bestimmt """ id: Optional[str] = Field( default=None, json_schema_extra={"label": "ID"}, ) userId: str = Field( ..., json_schema_extra={"label": "Benutzer-ID"}, ) authority: AuthAuthority = Field( ..., json_schema_extra={"label": "Autoritaet"}, ) connectionId: Optional[str] = Field( None, description="ID of the connection this token belongs to", json_schema_extra={"label": "Verbindungs-ID"}, ) tokenPurpose: Optional[TokenPurpose] = Field( default=None, description="authSession = gateway login JWT; dataConnection = provider OAuth for a connection", json_schema_extra={"label": "Token-Verwendung"}, ) tokenAccess: str = Field( ..., json_schema_extra={"label": "Zugriffstoken"}, ) tokenType: str = Field( default="bearer", json_schema_extra={"label": "Token-Typ"}, ) expiresAt: float = Field( description="When the token expires (UTC timestamp in seconds)", json_schema_extra={"label": "Laeuft ab am"}, ) tokenRefresh: Optional[str] = Field( default=None, json_schema_extra={"label": "Refresh-Token"}, ) status: TokenStatus = Field( default=TokenStatus.ACTIVE, description="Token status: active/revoked", json_schema_extra={"label": "Status"}, ) revokedAt: Optional[float] = Field( None, description="When the token was revoked (UTC timestamp in seconds)", json_schema_extra={"label": "Widerrufen am"}, ) revokedBy: Optional[str] = Field( None, description="User ID who revoked the token (admin/self)", json_schema_extra={"label": "Widerrufen von"}, ) reason: Optional[str] = Field( None, description="Optional revocation reason", json_schema_extra={"label": "Grund"}, ) sessionId: Optional[str] = Field( None, description="Logical session grouping for logout revocation", json_schema_extra={"label": "Sitzungs-ID"}, ) model_config = ConfigDict(use_enum_values=True) @model_validator(mode="before") @classmethod def _defaultTokenPurposeFromDb(cls, data: Any) -> Any: """Missing tokenPurpose: connection rows → dataConnection; session rows → authSession.""" if isinstance(data, dict): tp = data.get("tokenPurpose") if tp is None or tp == "": cid = data.get("connectionId") purpose = ( TokenPurpose.DATA_CONNECTION.value if cid else TokenPurpose.AUTH_SESSION.value ) data = {**data, "tokenPurpose": purpose} return data @i18nModel("Authentifizierungsereignis") class AuthEvent(PowerOnModel): """Authentication event for audit logging.""" id: str = Field( default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the auth event", json_schema_extra={"label": "ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, ) userId: str = Field( description="ID of the user this event belongs to", json_schema_extra={"label": "Benutzer-ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, ) eventType: str = Field( description="Type of authentication event (e.g., 'login', 'logout', 'token_refresh')", json_schema_extra={"label": "Ereignistyp", "frontend_type": "text", "frontend_readonly": True, "frontend_required": True}, ) timestamp: float = Field( default_factory=getUtcTimestamp, description="Unix timestamp when the event occurred", json_schema_extra={"label": "Zeitstempel", "frontend_type": "datetime", "frontend_readonly": True, "frontend_required": True}, ) ipAddress: Optional[str] = Field( default=None, description="IP address from which the event originated", json_schema_extra={"label": "IP-Adresse", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, ) userAgent: Optional[str] = Field( default=None, description="User agent string from the request", json_schema_extra={"label": "User-Agent", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, ) success: bool = Field( default=True, description="Whether the authentication event was successful", json_schema_extra={"label": "Erfolgreich", "frontend_type": "boolean", "frontend_readonly": True, "frontend_required": True}, ) details: Optional[str] = Field( default=None, description="Additional details about the event", json_schema_extra={"label": "Details", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False}, )