gateway/modules/shared/auditLogger.py
2026-01-19 09:18:37 +01:00

474 lines
16 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Audit Logging System for PowerOn Gateway
This module provides centralized audit logging functionality for security events,
user actions, and system access patterns. Logs are stored in the database for
GDPR compliance and security monitoring.
GDPR Requirements Addressed:
- Article 5(1)(f): Integrity and confidentiality - secure audit trail
- Article 17: Right to erasure - audit log retention with automatic cleanup
- Article 30: Records of processing activities - comprehensive event logging
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
class AuditLogger:
"""
Centralized audit logging system with database storage.
Logs security-relevant events to PostgreSQL for:
- GDPR compliance
- Security monitoring
- Access tracking
- Incident investigation
"""
def __init__(self):
self._db = None
self._modelClass = None
self._initialized = False
self._fallbackToStdout = True
def _ensureInitialized(self) -> bool:
"""Lazily initialize database connection to avoid circular imports."""
if self._initialized:
return self._db is not None
self._initialized = True
try:
from modules.datamodels.datamodelAudit import AuditLogEntry
from modules.connectors.connectorDbPostgre import DatabaseConnector
self._modelClass = AuditLogEntry
# Get database configuration
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_app" # Store audit logs in the main app database
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
# Create database connector with system user context
self._db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId="system" # Audit logs are created by system
)
# Initialize database and ensure table exists
self._db.initDbSystem()
self._db._ensureTableExists(AuditLogEntry)
logger.info("AuditLogger database connection initialized successfully")
return True
except Exception as e:
logger.warning(f"AuditLogger database initialization failed, using fallback logging: {e}")
self._db = None
return False
def _logToFallback(self, entry: Dict[str, Any]) -> None:
"""Log to standard logger as fallback when database is unavailable."""
if self._fallbackToStdout:
fallbackMsg = (
f"AUDIT | {entry.get('timestamp', '')} | "
f"{entry.get('userId', '')} | {entry.get('mandateId', '')} | "
f"{entry.get('category', '')} | {entry.get('action', '')} | "
f"{entry.get('details', '')}"
)
logging.getLogger('audit.fallback').info(fallbackMsg)
def logEvent(
self,
userId: str,
mandateId: Optional[str] = None,
category: str = "system",
action: str = "",
details: str = "",
featureInstanceId: Optional[str] = None,
resourceType: Optional[str] = None,
resourceId: Optional[str] = None,
ipAddress: Optional[str] = None,
userAgent: Optional[str] = None,
success: bool = True,
errorMessage: Optional[str] = None,
username: Optional[str] = None,
timestamp: Optional[float] = None
) -> Optional[str]:
"""
Log an audit event to the database.
Args:
userId: User identifier (or 'system' for system events)
mandateId: Mandate context (can be None for system-level events)
category: Event category (access, key, data, security, gdpr, permission, system)
action: Specific action performed
details: Additional details about the event
featureInstanceId: Feature instance context (if applicable)
resourceType: Type of resource affected
resourceId: ID of the affected resource
ipAddress: Client IP address
userAgent: Client user agent
success: Whether the action was successful
errorMessage: Error message if action failed
username: Username at the time of event (for historical reference)
timestamp: Optional custom timestamp (defaults to current time)
Returns:
ID of the created audit log entry, or None if logging failed
"""
try:
# Prepare the entry data
entryData = {
"timestamp": timestamp if timestamp else getUtcTimestamp(),
"userId": userId or "unknown",
"username": username,
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
"category": category,
"action": action,
"resourceType": resourceType,
"resourceId": resourceId,
"details": details if details else None,
"ipAddress": ipAddress,
"userAgent": userAgent,
"success": success,
"errorMessage": errorMessage
}
# Try to write to database
if self._ensureInitialized() and self._db:
from modules.datamodels.datamodelAudit import AuditLogEntry
entry = AuditLogEntry(**entryData)
created = self._db.recordCreate(AuditLogEntry, entry.model_dump())
if created and created.get("id"):
return created["id"]
else:
self._logToFallback(entryData)
return None
else:
# Use fallback logging
self._logToFallback(entryData)
return None
except Exception as e:
logger.error(f"Failed to log audit event: {e}")
# Try fallback
try:
self._logToFallback(entryData)
except Exception:
pass
return None
# ===== Convenience Methods for Common Event Types =====
def logKeyAccess(
self,
userId: str,
mandateId: str,
keyName: str,
action: str,
ipAddress: Optional[str] = None
) -> Optional[str]:
"""Log key access events (encode/decode)."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="key",
action=action,
details=f"Key: {keyName}",
resourceType="EncryptionKey",
resourceId=keyName,
ipAddress=ipAddress
)
def logUserAccess(
self,
userId: str,
mandateId: str,
action: str,
successInfo: str = "",
ipAddress: Optional[str] = None,
userAgent: Optional[str] = None,
success: bool = True
) -> Optional[str]:
"""Log user access events (login/logout)."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="access",
action=action,
details=successInfo,
ipAddress=ipAddress,
userAgent=userAgent,
success=success
)
def logDataAccess(
self,
userId: str,
mandateId: str,
action: str,
details: str = "",
resourceType: Optional[str] = None,
resourceId: Optional[str] = None,
featureInstanceId: Optional[str] = None
) -> Optional[str]:
"""Log data access events (CRUD operations)."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="data",
action=action,
details=details,
resourceType=resourceType,
resourceId=resourceId,
featureInstanceId=featureInstanceId
)
def logSecurityEvent(
self,
userId: str,
mandateId: str,
action: str,
details: str = "",
ipAddress: Optional[str] = None,
success: bool = True,
errorMessage: Optional[str] = None
) -> Optional[str]:
"""Log security-related events."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="security",
action=action,
details=details,
ipAddress=ipAddress,
success=success,
errorMessage=errorMessage
)
def logGdprEvent(
self,
userId: str,
mandateId: str,
action: str,
details: str = "",
ipAddress: Optional[str] = None
) -> Optional[str]:
"""Log GDPR-specific events (data export, deletion, etc.)."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="gdpr",
action=action,
details=details,
ipAddress=ipAddress
)
def logPermissionChange(
self,
userId: str,
mandateId: str,
action: str,
targetUserId: str,
details: str = "",
resourceType: Optional[str] = None,
resourceId: Optional[str] = None
) -> Optional[str]:
"""Log permission/role changes."""
return self.logEvent(
userId=userId,
mandateId=mandateId,
category="permission",
action=action,
details=f"Target user: {targetUserId}. {details}",
resourceType=resourceType,
resourceId=resourceId
)
# ===== Audit Log Query Methods =====
def getAuditLogs(
self,
userId: Optional[str] = None,
mandateId: Optional[str] = None,
category: Optional[str] = None,
action: Optional[str] = None,
fromTimestamp: Optional[float] = None,
toTimestamp: Optional[float] = None,
limit: int = 100
) -> list:
"""
Query audit logs from database.
Args:
userId: Filter by user ID
mandateId: Filter by mandate ID
category: Filter by category
action: Filter by action
fromTimestamp: Filter events after this timestamp
toTimestamp: Filter events before this timestamp
limit: Maximum number of records to return
Returns:
List of audit log entries
"""
if not self._ensureInitialized() or not self._db:
return []
try:
from modules.datamodels.datamodelAudit import AuditLogEntry
# Build filter
recordFilter = {}
if userId:
recordFilter["userId"] = userId
if mandateId:
recordFilter["mandateId"] = mandateId
if category:
recordFilter["category"] = category
if action:
recordFilter["action"] = action
# Query database
records = self._db.getRecordset(
AuditLogEntry,
recordFilter=recordFilter if recordFilter else None,
orderBy="timestamp DESC"
)
# Apply timestamp filtering in Python (PostgreSQL connector may not support $gt/$lt)
if fromTimestamp or toTimestamp:
filteredRecords = []
for record in records:
ts = record.get("timestamp", 0)
if fromTimestamp and ts < fromTimestamp:
continue
if toTimestamp and ts > toTimestamp:
continue
filteredRecords.append(record)
records = filteredRecords
# Apply limit
return records[:limit]
except Exception as e:
logger.error(f"Failed to query audit logs: {e}")
return []
# ===== Cleanup Methods =====
def cleanupOldEntries(self, retentionDays: int = 365) -> int:
"""
Remove audit log entries older than the retention period.
GDPR Note: Audit logs should be retained for a reasonable period
for security and compliance purposes, but not indefinitely.
Default retention is 1 year (365 days).
Args:
retentionDays: Number of days to retain audit logs
Returns:
Number of entries deleted
"""
if not self._ensureInitialized() or not self._db:
logger.warning("Cannot cleanup audit logs: database not initialized")
return 0
try:
from modules.datamodels.datamodelAudit import AuditLogEntry
import time
# Calculate cutoff timestamp
cutoffTimestamp = time.time() - (retentionDays * 24 * 60 * 60)
# Query old entries
allRecords = self._db.getRecordset(AuditLogEntry)
oldRecords = [r for r in allRecords if r.get("timestamp", 0) < cutoffTimestamp]
# Delete old entries
deletedCount = 0
for record in oldRecords:
recordId = record.get("id")
if recordId:
if self._db.recordDelete(AuditLogEntry, recordId):
deletedCount += 1
logger.info(f"Audit log cleanup: removed {deletedCount} entries older than {retentionDays} days")
# Log the cleanup action itself
self.logEvent(
userId="system",
mandateId="system",
category="system",
action="audit_cleanup",
details=f"Removed {deletedCount} entries older than {retentionDays} days"
)
return deletedCount
except Exception as e:
logger.error(f"Failed to cleanup audit logs: {e}")
return 0
# Global audit logger instance
audit_logger = AuditLogger()
# ===== Scheduler Integration =====
async def runAuditLogCleanup() -> None:
"""
Scheduled task to cleanup old audit log entries.
Called by the event scheduler.
"""
try:
retentionDays = int(APP_CONFIG.get("AUDIT_LOG_RETENTION_DAYS", 365))
deletedCount = audit_logger.cleanupOldEntries(retentionDays=retentionDays)
logger.info(f"Scheduled audit log cleanup completed: {deletedCount} entries removed")
except Exception as e:
logger.error(f"Scheduled audit log cleanup failed: {e}")
def registerAuditLogCleanupScheduler() -> None:
"""
Register the audit log cleanup job with the event scheduler.
Should be called during application startup.
"""
try:
from modules.shared.eventManagement import eventManager
# Run cleanup daily at 3 AM
eventManager.registerCron(
jobId="audit_log_cleanup",
func=runAuditLogCleanup,
cronKwargs={
"hour": "3",
"minute": "0"
}
)
logger.info("Audit log cleanup scheduler registered (daily at 03:00)")
except Exception as e:
logger.error(f"Failed to register audit log cleanup scheduler: {e}")