gateway/modules/datamodels/datamodelAudit.py
2026-01-19 09:18:37 +01:00

208 lines
7.9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Audit Log Data Model for database-based audit logging.
This model stores security-relevant audit events for GDPR compliance and security monitoring.
GDPR-Relevant Events:
- User access: login, logout, failed login attempts
- Data access: create, read, update, delete operations on personal data
- Security events: password changes, token refresh, session management
- Key access: encryption/decryption of sensitive data
- GDPR actions: data export, data portability, account deletion
- Mandate/permission changes: user added/removed from mandates, role changes
"""
from typing import Optional
from pydantic import BaseModel, Field
from enum import Enum
import uuid
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.attributeUtils import registerModelLabels
class AuditCategory(str, Enum):
"""Categories for audit log entries"""
ACCESS = "access" # Login/logout events
KEY = "key" # Encryption key access
DATA = "data" # Data CRUD operations
SECURITY = "security" # Security-related events
GDPR = "gdpr" # GDPR-specific actions
PERMISSION = "permission" # Permission/role changes
SYSTEM = "system" # System-level events
class AuditAction(str, Enum):
"""Actions for audit log entries"""
# Access actions
LOGIN = "login"
LOGIN_FAILED = "login_failed"
LOGOUT = "logout"
TOKEN_REFRESH = "token_refresh"
TOKEN_REVOKE = "token_revoke"
SESSION_EXPIRED = "session_expired"
# Key actions
KEY_ENCODE = "encode"
KEY_DECODE = "decode"
KEY_ACCESS = "key_access"
# Data actions
DATA_CREATE = "create"
DATA_READ = "read"
DATA_UPDATE = "update"
DATA_DELETE = "delete"
DATA_EXPORT = "export"
# Security actions
PASSWORD_CHANGE = "password_change"
PASSWORD_RESET = "password_reset"
MFA_ENABLED = "mfa_enabled"
MFA_DISABLED = "mfa_disabled"
# GDPR actions
GDPR_DATA_EXPORT = "gdpr_data_export"
GDPR_DATA_PORTABILITY = "gdpr_data_portability"
GDPR_ACCOUNT_DELETION = "gdpr_account_deletion"
GDPR_CONSENT_UPDATE = "gdpr_consent_update"
# Permission actions
USER_ADDED_TO_MANDATE = "user_added_to_mandate"
USER_REMOVED_FROM_MANDATE = "user_removed_from_mandate"
ROLE_ASSIGNED = "role_assigned"
ROLE_REVOKED = "role_revoked"
FEATURE_ACCESS_GRANTED = "feature_access_granted"
FEATURE_ACCESS_REVOKED = "feature_access_revoked"
# System actions
SYSTEM_STARTUP = "system_startup"
SYSTEM_SHUTDOWN = "system_shutdown"
CONFIG_CHANGE = "config_change"
class AuditLogEntry(BaseModel):
"""
Audit log entry for database storage.
Stores all security-relevant events for compliance and monitoring.
Entries are immutable once created (append-only audit trail).
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique identifier for the audit entry",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
# Timestamp
timestamp: float = Field(
default_factory=getUtcTimestamp,
description="UTC timestamp when the event occurred",
json_schema_extra={"frontend_type": "datetime", "frontend_readonly": True, "frontend_required": True}
)
# Actor identification
userId: str = Field(
description="ID of the user who performed the action (or 'system' for system events)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}
)
username: Optional[str] = Field(
default=None,
description="Username at the time of the event (for historical reference)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
# Context
mandateId: Optional[str] = Field(
default=None,
description="Mandate context (if applicable)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
featureInstanceId: Optional[str] = Field(
default=None,
description="Feature instance context (if applicable)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
# Event classification
category: str = Field(
description="Event category (access, key, data, security, gdpr, permission, system)",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}
)
action: str = Field(
description="Specific action performed",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": True}
)
# Event details
resourceType: Optional[str] = Field(
default=None,
description="Type of resource affected (e.g., 'User', 'ChatWorkflow', 'TrusteeContract')",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
resourceId: Optional[str] = Field(
default=None,
description="ID of the affected resource",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
details: Optional[str] = Field(
default=None,
description="Additional details about the event",
json_schema_extra={"frontend_type": "textarea", "frontend_readonly": True, "frontend_required": False}
)
# Request metadata
ipAddress: Optional[str] = Field(
default=None,
description="IP address of the client",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
userAgent: Optional[str] = Field(
default=None,
description="User agent string from the request",
json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}
)
# Outcome
success: bool = Field(
default=True,
description="Whether the action was successful",
json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": True, "frontend_required": True}
)
errorMessage: Optional[str] = Field(
default=None,
description="Error message if the action failed",
json_schema_extra={"frontend_type": "textarea", "frontend_readonly": True, "frontend_required": False}
)
# Register labels for internationalization
registerModelLabels(
"AuditLogEntry",
{"en": "Audit Log Entry", "de": "Audit-Log-Eintrag", "fr": "Entrée du journal d'audit"},
{
"id": {"en": "ID", "de": "ID", "fr": "ID"},
"timestamp": {"en": "Timestamp", "de": "Zeitstempel", "fr": "Horodatage"},
"userId": {"en": "User ID", "de": "Benutzer-ID", "fr": "ID utilisateur"},
"username": {"en": "Username", "de": "Benutzername", "fr": "Nom d'utilisateur"},
"mandateId": {"en": "Mandate ID", "de": "Mandanten-ID", "fr": "ID du mandat"},
"featureInstanceId": {"en": "Feature Instance ID", "de": "Feature-Instanz-ID", "fr": "ID de l'instance"},
"category": {"en": "Category", "de": "Kategorie", "fr": "Catégorie"},
"action": {"en": "Action", "de": "Aktion", "fr": "Action"},
"resourceType": {"en": "Resource Type", "de": "Ressourcentyp", "fr": "Type de ressource"},
"resourceId": {"en": "Resource ID", "de": "Ressourcen-ID", "fr": "ID de ressource"},
"details": {"en": "Details", "de": "Details", "fr": "Détails"},
"ipAddress": {"en": "IP Address", "de": "IP-Adresse", "fr": "Adresse IP"},
"userAgent": {"en": "User Agent", "de": "User-Agent", "fr": "Agent utilisateur"},
"success": {"en": "Success", "de": "Erfolgreich", "fr": "Succès"},
"errorMessage": {"en": "Error Message", "de": "Fehlermeldung", "fr": "Message d'erreur"},
},
)