gateway/modules/datamodels/datamodelSecurity.py
ValueOn AG 0a0973d41b hotfix msft/google login tokens end to end separated from connection
feat(billing): Nutzerhinweise bei leerem Budget + Mandats-Mail (402/SSE)
Gateway
- InsufficientBalanceException: billingModel, userAction (TOP_UP_SELF /
  CONTACT_MANDATE_ADMIN), DE/EN-Texte, toClientDict(), fromBalanceCheck()
- HTTP 402 + JSON detail für globale API-Fehlerbehandlung
- AI/Chatbot: vor Raise ggf. E-Mail an BillingSettings.notifyEmails
  (PREPAY_MANDATE, Throttle 1h/Mandat) via billingExhaustedNotify
- Agent-Loop & Workspace-Route: SSE-ERROR mit strukturiertem Billing-Payload
- datamodelBilling: notifyEmails-Doku für Pool-Alerts
frontend_nyla
- useWorkspace: SSE onError für INSUFFICIENT_BALANCE mit messageDe/En
  und Hinweis auf Billing-Pfad bei TOP_UP_SELF
2026-03-21 01:34:40 +01:00

144 lines
6.9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Security models: Token and AuthEvent.
Multi-Tenant Design:
- Token ist NICHT an einen Mandanten gebunden
- User arbeitet parallel in mehreren Mandanten (z.B. mehrere Browser-Tabs)
- Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt
"""
from typing import Optional, Any
from pydantic import BaseModel, Field, ConfigDict, model_validator
from modules.shared.attributeUtils import registerModelLabels
from modules.shared.timeUtils import getUtcTimestamp
from .datamodelUam import AuthAuthority
from enum import Enum
import uuid
class TokenStatus(str, Enum):
ACTIVE = "active"
REVOKED = "revoked"
class TokenPurpose(str, Enum):
"""Login/session token vs provider token bound to a UserConnection."""
AUTH_SESSION = "authSession"
DATA_CONNECTION = "dataConnection"
class Token(BaseModel):
"""
Authentication Token model.
Multi-Tenant Design:
- Token ist User-gebunden, NICHT Mandant-gebunden
- Ermöglicht parallele Arbeit in mehreren Mandanten
- Mandant-Kontext wird per Request-Header bestimmt
"""
id: Optional[str] = None
userId: str
authority: AuthAuthority
connectionId: Optional[str] = Field(
None, description="ID of the connection this token belongs to"
)
tokenPurpose: Optional[TokenPurpose] = Field(
default=None,
description="authSession = gateway login JWT; dataConnection = provider OAuth for a connection",
)
tokenAccess: str
tokenType: str = "bearer"
expiresAt: float = Field(
description="When the token expires (UTC timestamp in seconds)"
)
tokenRefresh: Optional[str] = None
createdAt: Optional[float] = Field(
None, description="When the token was created (UTC timestamp in seconds)"
)
status: TokenStatus = Field(
default=TokenStatus.ACTIVE, description="Token status: active/revoked"
)
revokedAt: Optional[float] = Field(
None, description="When the token was revoked (UTC timestamp in seconds)"
)
revokedBy: Optional[str] = Field(
None, description="User ID who revoked the token (admin/self)"
)
reason: Optional[str] = Field(None, description="Optional revocation reason")
sessionId: Optional[str] = Field(
None, description="Logical session grouping for logout revocation"
)
# ENTFERNT: mandateId - Token ist nicht mehr Mandant-spezifisch
# Mandant-Kontext wird per Request-Header (X-Mandate-Id) bestimmt
model_config = ConfigDict(use_enum_values=True)
@model_validator(mode="before")
@classmethod
def _defaultTokenPurposeFromDb(cls, data: Any) -> Any:
"""Missing tokenPurpose: connection rows → dataConnection; session rows → authSession."""
if isinstance(data, dict):
tp = data.get("tokenPurpose")
if tp is None or tp == "":
cid = data.get("connectionId")
purpose = (
TokenPurpose.DATA_CONNECTION.value
if cid
else TokenPurpose.AUTH_SESSION.value
)
data = {**data, "tokenPurpose": purpose}
return data
registerModelLabels(
"Token",
{"en": "Token", "de": "Token", "fr": "Jeton"},
{
"id": {"en": "ID", "de": "ID", "fr": "ID"},
"userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"},
"authority": {"en": "Authority", "de": "Autorität", "fr": "Autorité"},
"connectionId": {"en": "Connection ID", "de": "Verbindungs-ID", "fr": "ID de connexion"},
"tokenPurpose": {"en": "Token purpose", "de": "Token-Verwendung", "fr": "Usage du jeton"},
"tokenAccess": {"en": "Access Token", "de": "Zugriffstoken", "fr": "Jeton d'accès"},
"tokenType": {"en": "Token Type", "de": "Token-Typ", "fr": "Type de jeton"},
"expiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"},
"tokenRefresh": {"en": "Refresh Token", "de": "Refresh-Token", "fr": "Jeton de rafraîchissement"},
"createdAt": {"en": "Created At", "de": "Erstellt am", "fr": "Créé le"},
"status": {"en": "Status", "de": "Status", "fr": "Statut"},
"revokedAt": {"en": "Revoked At", "de": "Widerrufen am", "fr": "Révoqué le"},
"revokedBy": {"en": "Revoked By", "de": "Widerrufen von", "fr": "Révoqué par"},
"reason": {"en": "Reason", "de": "Grund", "fr": "Raison"},
"sessionId": {"en": "Session ID", "de": "Sitzungs-ID", "fr": "ID de session"},
},
)
class AuthEvent(BaseModel):
"""Authentication event for audit logging."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the auth event", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
userId: str = Field(description="ID of the user this event belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True})
eventType: str = Field(description="Type of authentication event (e.g., 'login', 'logout', 'token_refresh')", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True})
timestamp: float = Field(default_factory=getUtcTimestamp, description="Unix timestamp when the event occurred", json_schema_extra={"frontend_type": "datetime", "frontend_readonly": True, "frontend_required": True})
ipAddress: Optional[str] = Field(default=None, description="IP address from which the event originated", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
userAgent: Optional[str] = Field(default=None, description="User agent string from the request", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
success: bool = Field(default=True, description="Whether the authentication event was successful", json_schema_extra={"frontend_type": "boolean", "frontend_readonly": True, "frontend_required": True})
details: Optional[str] = Field(default=None, description="Additional details about the event", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
registerModelLabels(
"AuthEvent",
{"en": "Authentication Event", "de": "Authentifizierungsereignis", "fr": "Événement d'authentification"},
{
"id": {"en": "ID", "de": "ID", "fr": "ID"},
"userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"},
"eventType": {"en": "Event Type", "de": "Ereignistyp", "fr": "Type d'événement"},
"timestamp": {"en": "Timestamp", "de": "Zeitstempel", "fr": "Horodatage"},
"ipAddress": {"en": "IP Address", "de": "IP-Adresse", "fr": "Adresse IP"},
"userAgent": {"en": "User Agent", "de": "User-Agent", "fr": "Agent utilisateur"},
"success": {"en": "Success", "de": "Erfolgreich", "fr": "Succès"},
"details": {"en": "Details", "de": "Details", "fr": "Détails"},
},
)