173 lines
6.5 KiB
Python
173 lines
6.5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Security models: Token and AuthEvent.
|
|
|
|
Multi-Tenant Design:
|
|
- Token ist NICHT an einen Mandanten gebunden
|
|
- User arbeitet parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs)
|
|
- Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt
|
|
"""
|
|
|
|
from typing import Optional, Any
|
|
from pydantic import BaseModel, Field, ConfigDict, model_validator
|
|
from modules.datamodels.datamodelBase import PowerOnModel
|
|
from modules.shared.i18nRegistry import i18nModel
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from .datamodelUam import AuthAuthority
|
|
from enum import Enum
|
|
import uuid
|
|
|
|
|
|
class TokenStatus(str, Enum):
|
|
ACTIVE = "active"
|
|
REVOKED = "revoked"
|
|
|
|
|
|
class TokenPurpose(str, Enum):
|
|
"""Login/session token vs provider token bound to a UserConnection."""
|
|
|
|
AUTH_SESSION = "authSession"
|
|
DATA_CONNECTION = "dataConnection"
|
|
|
|
|
|
@i18nModel("Token")
|
|
class Token(PowerOnModel):
|
|
"""
|
|
Authentication Token model.
|
|
|
|
Multi-Tenant Design:
|
|
- Token ist User-gebunden, NICHT Mandant-gebunden
|
|
- Ermöglicht parallele Arbeit in mehreren Mandanten
|
|
- Mandant-Kontext wird per Request-Header bestimmt
|
|
"""
|
|
id: Optional[str] = Field(
|
|
default=None,
|
|
json_schema_extra={"label": "ID"},
|
|
)
|
|
userId: str = Field(
|
|
...,
|
|
json_schema_extra={"label": "Benutzer-ID", "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"}},
|
|
)
|
|
authority: AuthAuthority = Field(
|
|
...,
|
|
json_schema_extra={"label": "Autoritaet"},
|
|
)
|
|
connectionId: Optional[str] = Field(
|
|
None,
|
|
description="ID of the connection this token belongs to",
|
|
json_schema_extra={"label": "Verbindungs-ID", "fk_target": {"db": "poweron_app", "table": "UserConnection", "labelField": "externalUsername"}},
|
|
)
|
|
tokenPurpose: Optional[TokenPurpose] = Field(
|
|
default=None,
|
|
description="authSession = gateway login JWT; dataConnection = provider OAuth for a connection",
|
|
json_schema_extra={"label": "Token-Verwendung"},
|
|
)
|
|
tokenAccess: str = Field(
|
|
...,
|
|
json_schema_extra={"label": "Zugriffstoken"},
|
|
)
|
|
tokenType: str = Field(
|
|
default="bearer",
|
|
json_schema_extra={"label": "Token-Typ"},
|
|
)
|
|
expiresAt: float = Field(
|
|
description="When the token expires (UTC timestamp in seconds)",
|
|
json_schema_extra={"label": "Laeuft ab am", "frontend_type": "timestamp"},
|
|
)
|
|
tokenRefresh: Optional[str] = Field(
|
|
default=None,
|
|
json_schema_extra={"label": "Refresh-Token"},
|
|
)
|
|
status: TokenStatus = Field(
|
|
default=TokenStatus.ACTIVE,
|
|
description="Token status: active/revoked",
|
|
json_schema_extra={"label": "Status"},
|
|
)
|
|
revokedAt: Optional[float] = Field(
|
|
None,
|
|
description="When the token was revoked (UTC timestamp in seconds)",
|
|
json_schema_extra={"label": "Widerrufen am", "frontend_type": "timestamp"},
|
|
)
|
|
revokedBy: Optional[str] = Field(
|
|
None,
|
|
description="User ID who revoked the token (admin/self)",
|
|
json_schema_extra={"label": "Widerrufen von", "fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"}},
|
|
)
|
|
reason: Optional[str] = Field(
|
|
None,
|
|
description="Optional revocation reason",
|
|
json_schema_extra={"label": "Grund"},
|
|
)
|
|
sessionId: Optional[str] = Field(
|
|
None,
|
|
description="Logical session grouping for logout revocation",
|
|
json_schema_extra={"label": "Sitzungs-ID"},
|
|
)
|
|
|
|
model_config = ConfigDict(use_enum_values=True)
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def _defaultTokenPurposeFromDb(cls, data: Any) -> Any:
|
|
"""Missing tokenPurpose: connection rows → dataConnection; session rows → authSession."""
|
|
if isinstance(data, dict):
|
|
tp = data.get("tokenPurpose")
|
|
if tp is None or tp == "":
|
|
cid = data.get("connectionId")
|
|
purpose = (
|
|
TokenPurpose.DATA_CONNECTION.value
|
|
if cid
|
|
else TokenPurpose.AUTH_SESSION.value
|
|
)
|
|
data = {**data, "tokenPurpose": purpose}
|
|
return data
|
|
|
|
|
|
@i18nModel("Authentifizierungsereignis")
|
|
class AuthEvent(PowerOnModel):
|
|
"""Authentication event for audit logging."""
|
|
id: str = Field(
|
|
default_factory=lambda: str(uuid.uuid4()),
|
|
description="Unique ID of the auth event",
|
|
json_schema_extra={"label": "ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
|
|
)
|
|
userId: str = Field(
|
|
description="ID of the user this event belongs to",
|
|
json_schema_extra={
|
|
"label": "Benutzer-ID",
|
|
"frontend_type": "text",
|
|
"frontend_readonly": True,
|
|
"frontend_required": True,
|
|
"fk_target": {"db": "poweron_app", "table": "UserInDB", "labelField": "username"},
|
|
},
|
|
)
|
|
eventType: str = Field(
|
|
description="Type of authentication event (e.g., 'login', 'logout', 'token_refresh')",
|
|
json_schema_extra={"label": "Ereignistyp", "frontend_type": "text", "frontend_readonly": True, "frontend_required": True},
|
|
)
|
|
timestamp: float = Field(
|
|
default_factory=getUtcTimestamp,
|
|
description="Unix timestamp when the event occurred",
|
|
json_schema_extra={"label": "Zeitstempel", "frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": True},
|
|
)
|
|
ipAddress: Optional[str] = Field(
|
|
default=None,
|
|
description="IP address from which the event originated",
|
|
json_schema_extra={"label": "IP-Adresse", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
|
|
)
|
|
userAgent: Optional[str] = Field(
|
|
default=None,
|
|
description="User agent string from the request",
|
|
json_schema_extra={"label": "User-Agent", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
|
|
)
|
|
success: bool = Field(
|
|
default=True,
|
|
description="Whether the authentication event was successful",
|
|
json_schema_extra={"label": "Erfolgreich", "frontend_type": "boolean", "frontend_readonly": True, "frontend_required": True},
|
|
)
|
|
details: Optional[str] = Field(
|
|
default=None,
|
|
description="Additional details about the event",
|
|
json_schema_extra={"label": "Details", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False},
|
|
)
|