gateway/modules/datamodels/datamodelSecurity.py
2026-04-10 12:33:27 +02:00

167 lines
6 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Security models: Token and AuthEvent.
Multi-Tenant Design:
- Token ist NICHT an einen Mandanten gebunden
- User arbeitet parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs)
- Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt
"""
from typing import Optional, Any
from pydantic import BaseModel, Field, ConfigDict, model_validator
from modules.datamodels.datamodelBase import PowerOnModel
from modules.shared.i18nRegistry import i18nModel
from modules.shared.timeUtils import getUtcTimestamp
from .datamodelUam import AuthAuthority
from enum import Enum
import uuid
class TokenStatus(str, Enum):
ACTIVE = "active"
REVOKED = "revoked"
class TokenPurpose(str, Enum):
"""Login/session token vs provider token bound to a UserConnection."""
AUTH_SESSION = "authSession"
DATA_CONNECTION = "dataConnection"
@i18nModel("Token")
class Token(PowerOnModel):
"""
Authentication Token model.
Multi-Tenant Design:
- Token ist User-gebunden, NICHT Mandant-gebunden
- Ermöglicht parallele Arbeit in mehreren Mandanten
- Mandant-Kontext wird per Request-Header bestimmt
"""
id: Optional[str] = Field(
default=None,
json_schema_extra={"label": "ID"},
)
userId: str = Field(
...,
json_schema_extra={"label": "Benutzer-ID"},
)
authority: AuthAuthority = Field(
...,
json_schema_extra={"label": "Autoritaet"},
)
connectionId: Optional[str] = Field(
None,
description="ID of the connection this token belongs to",
json_schema_extra={"label": "Verbindungs-ID"},
)
tokenPurpose: Optional[TokenPurpose] = Field(
default=None,
description="authSession = gateway login JWT; dataConnection = provider OAuth for a connection",
json_schema_extra={"label": "Token-Verwendung"},
)
tokenAccess: str = Field(
...,
json_schema_extra={"label": "Zugriffstoken"},
)
tokenType: str = Field(
default="bearer",
json_schema_extra={"label": "Token-Typ"},
)
expiresAt: float = Field(
description="When the token expires (UTC timestamp in seconds)",
json_schema_extra={"label": "Laeuft ab am"},
)
tokenRefresh: Optional[str] = Field(
default=None,
json_schema_extra={"label": "Refresh-Token"},
)
status: TokenStatus = Field(
default=TokenStatus.ACTIVE,
description="Token status: active/revoked",
json_schema_extra={"label": "Status"},
)
revokedAt: Optional[float] = Field(
None,
description="When the token was revoked (UTC timestamp in seconds)",
json_schema_extra={"label": "Widerrufen am"},
)
revokedBy: Optional[str] = Field(
None,
description="User ID who revoked the token (admin/self)",
json_schema_extra={"label": "Widerrufen von"},
)
reason: Optional[str] = Field(
None,
description="Optional revocation reason",
json_schema_extra={"label": "Grund"},
)
sessionId: Optional[str] = Field(
None,
description="Logical session grouping for logout revocation",
json_schema_extra={"label": "Sitzungs-ID"},
)
model_config = ConfigDict(use_enum_values=True)
@model_validator(mode="before")
@classmethod
def _defaultTokenPurposeFromDb(cls, data: Any) -> Any:
"""Missing tokenPurpose: connection rows → dataConnection; session rows → authSession."""
if isinstance(data, dict):
tp = data.get("tokenPurpose")
if tp is None or tp == "":
cid = data.get("connectionId")
purpose = (
TokenPurpose.DATA_CONNECTION.value
if cid
else TokenPurpose.AUTH_SESSION.value
)
data = {**data, "tokenPurpose": purpose}
return data
@i18nModel("Authentifizierungsereignis")
class AuthEvent(PowerOnModel):
"""Authentication event for audit logging."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique ID of the auth event",
json_schema_extra={"label": "ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
)
userId: str = Field(
description="ID of the user this event belongs to",
json_schema_extra={"label": "Benutzer-ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": True},
)
eventType: str = Field(
description="Type of authentication event (e.g., 'login', 'logout', 'token_refresh')",
json_schema_extra={"label": "Ereignistyp", "frontend_type": "text", "frontend_readonly": True, "frontend_required": True},
)
timestamp: float = Field(
default_factory=getUtcTimestamp,
description="Unix timestamp when the event occurred",
json_schema_extra={"label": "Zeitstempel", "frontend_type": "datetime", "frontend_readonly": True, "frontend_required": True},
)
ipAddress: Optional[str] = Field(
default=None,
description="IP address from which the event originated",
json_schema_extra={"label": "IP-Adresse", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
)
userAgent: Optional[str] = Field(
default=None,
description="User agent string from the request",
json_schema_extra={"label": "User-Agent", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
)
success: bool = Field(
default=True,
description="Whether the authentication event was successful",
json_schema_extra={"label": "Erfolgreich", "frontend_type": "boolean", "frontend_readonly": True, "frontend_required": True},
)
details: Optional[str] = Field(
default=None,
description="Additional details about the event",
json_schema_extra={"label": "Details", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
)