# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Audit Logging System for PowerOn Gateway This module provides centralized audit logging functionality for security events, user actions, and system access patterns. """ import logging import os from datetime import datetime from typing import Optional, Dict, Any from logging.handlers import RotatingFileHandler from modules.shared.configuration import APP_CONFIG class DailyRotatingFileHandler(RotatingFileHandler): """ A rotating file handler that automatically switches to a new file when the date changes. The log file name includes the current date and switches at midnight. """ def __init__(self, logDir, filenamePrefix, maxBytes=10485760, backupCount=5, **kwargs): self.logDir = logDir self.filenamePrefix = filenamePrefix self.currentDate = None self.currentFile = None # Initialize with today's file self._updateFileIfNeeded() # Call parent constructor with current file super().__init__(self.currentFile, maxBytes=maxBytes, backupCount=backupCount, **kwargs) def _updateFileIfNeeded(self): """Update the log file if the date has changed""" today = datetime.now().strftime("%Y%m%d") if self.currentDate != today: self.currentDate = today newFile = os.path.join(self.logDir, f"{self.filenamePrefix}_{today}.log") if self.currentFile != newFile: self.currentFile = newFile return True return False def emit(self, record): """Emit a log record, switching files if date has changed""" # Check if we need to switch to a new file if self._updateFileIfNeeded(): # Close current file and open new one if self.stream: self.stream.close() self.stream = None # Update the baseFilename for the parent class self.baseFilename = self.currentFile # Reopen the stream if not self.delay: self.stream = self._open() # Call parent emit method super().emit(record) class AuditLogger: """Centralized audit logging system""" def __init__(self): self.logger = None self._setupAuditLogger() def _setupAuditLogger(self): """Setup the audit logger with daily file rotation""" try: # Get log directory from config logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") if not os.path.isabs(logDir): # If relative path, make it relative to the gateway directory gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logDir = os.path.join(gatewayDir, logDir) # Ensure log directory exists os.makedirs(logDir, exist_ok=True) # Create audit logger self.logger = logging.getLogger('audit') self.logger.setLevel(logging.INFO) # Remove any existing handlers to avoid duplicates for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) # Create daily rotating file handler for audit log rotationSize = int(APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)) # Default: 10MB backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5)) fileHandler = DailyRotatingFileHandler( logDir=logDir, filenamePrefix="log_audit", maxBytes=rotationSize, backupCount=backupCount ) # Create formatter for audit log auditFormatter = logging.Formatter( fmt="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(auditFormatter) # Add handler to logger self.logger.addHandler(fileHandler) # Prevent propagation to root logger self.logger.propagate = False except Exception as e: # Fallback to standard logger if audit setup fails self.logger = logging.getLogger(__name__) self.logger.error(f"Failed to setup audit logger: {str(e)}") def logEvent(self, userId: str, mandateId: str, category: str, action: str, details: str = "", timestamp: Optional[datetime] = None) -> None: """ Log an audit event Args: userId: User identifier mandateId: Mandate identifier (can be empty if not applicable) category: Event category (e.g., 'key', 'access', 'data') action: Specific action (e.g., 'decode', 'login', 'logout') details: Additional details about the event timestamp: Optional custom timestamp (defaults to current time) """ try: if not self.logger: return # Use provided timestamp or current time if timestamp is None: timestamp = datetime.now() # Format the audit log entry # Format: timestamp | userid | mandateid | category | action | details auditEntry = f"{userId} | {mandateId} | {category} | {action} | {details}" # Log the event self.logger.info(auditEntry) except Exception as e: # Use standard logger as fallback logging.getLogger(__name__).error(f"Failed to log audit event: {str(e)}") def logKeyAccess(self, userId: str, mandateId: str, keyName: str, action: str) -> None: """Log key access events (decode/encode)""" self.logEvent( userId=userId, mandateId=mandateId, category="key", action=action, details=keyName ) def logUserAccess(self, userId: str, mandateId: str, action: str, successInfo: str = "") -> None: """Log user access events (login/logout)""" self.logEvent( userId=userId, mandateId=mandateId, category="access", action=action, details=successInfo ) def logDataAccess(self, userId: str, mandateId: str, action: str, details: str = "") -> None: """Log data access events""" self.logEvent( userId=userId, mandateId=mandateId, category="data", action=action, details=details ) def logSecurityEvent(self, userId: str, mandateId: str, action: str, details: str = "") -> None: """Log security-related events""" self.logEvent( userId=userId, mandateId=mandateId, category="security", action=action, details=details ) # Global audit logger instance audit_logger = AuditLogger()