""" Audit Logging System for PowerOn Gateway This module provides centralized audit logging functionality for security events, user actions, and system access patterns. """ import logging import os from datetime import datetime from typing import Optional, Dict, Any from logging.handlers import RotatingFileHandler from modules.shared.configuration import APP_CONFIG class DailyRotatingFileHandler(RotatingFileHandler): """ A rotating file handler that automatically switches to a new file when the date changes. The log file name includes the current date and switches at midnight. """ def __init__(self, log_dir, filename_prefix, max_bytes=10485760, backup_count=5, **kwargs): self.log_dir = log_dir self.filename_prefix = filename_prefix self.current_date = None self.current_file = None # Initialize with today's file self._update_file_if_needed() # Call parent constructor with current file super().__init__(self.current_file, maxBytes=max_bytes, backupCount=backup_count, **kwargs) def _update_file_if_needed(self): """Update the log file if the date has changed""" today = datetime.now().strftime("%Y%m%d") if self.current_date != today: self.current_date = today new_file = os.path.join(self.log_dir, f"{self.filename_prefix}_{today}.log") if self.current_file != new_file: self.current_file = new_file return True return False def emit(self, record): """Emit a log record, switching files if date has changed""" # Check if we need to switch to a new file if self._update_file_if_needed(): # Close current file and open new one if self.stream: self.stream.close() self.stream = None # Update the baseFilename for the parent class self.baseFilename = self.current_file # Reopen the stream if not self.delay: self.stream = self._open() # Call parent emit method super().emit(record) class AuditLogger: """Centralized audit logging system""" def __init__(self): self.logger = None self._setup_audit_logger() def _setup_audit_logger(self): """Setup the audit logger with daily file rotation""" try: # Get log directory from config logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") if not os.path.isabs(logDir): # If relative path, make it relative to the gateway directory gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logDir = os.path.join(gatewayDir, logDir) # Ensure log directory exists os.makedirs(logDir, exist_ok=True) # Create audit logger self.logger = logging.getLogger('audit') self.logger.setLevel(logging.INFO) # Remove any existing handlers to avoid duplicates for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) # Create daily rotating file handler for audit log rotationSize = int(APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)) # Default: 10MB backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5)) fileHandler = DailyRotatingFileHandler( log_dir=logDir, filename_prefix="log_audit", max_bytes=rotationSize, backup_count=backupCount ) # Create formatter for audit log auditFormatter = logging.Formatter( fmt="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(auditFormatter) # Add handler to logger self.logger.addHandler(fileHandler) # Prevent propagation to root logger self.logger.propagate = False except Exception as e: # Fallback to standard logger if audit setup fails self.logger = logging.getLogger(__name__) self.logger.error(f"Failed to setup audit logger: {str(e)}") def log_event(self, user_id: str, mandate_id: str, category: str, action: str, details: str = "", timestamp: Optional[datetime] = None) -> None: """ Log an audit event Args: user_id: User identifier mandate_id: Mandate identifier (can be empty if not applicable) category: Event category (e.g., 'key', 'access', 'data') action: Specific action (e.g., 'decode', 'login', 'logout') details: Additional details about the event timestamp: Optional custom timestamp (defaults to current time) """ try: if not self.logger: return # Use provided timestamp or current time if timestamp is None: timestamp = datetime.now() # Format the audit log entry # Format: timestamp | userid | mandateid | category | action | details audit_entry = f"{user_id} | {mandate_id} | {category} | {action} | {details}" # Log the event self.logger.info(audit_entry) except Exception as e: # Use standard logger as fallback logging.getLogger(__name__).error(f"Failed to log audit event: {str(e)}") def log_key_access(self, user_id: str, mandate_id: str, key_name: str, action: str) -> None: """Log key access events (decode/encode)""" self.log_event( user_id=user_id, mandate_id=mandate_id, category="key", action=action, details=key_name ) def log_user_access(self, user_id: str, mandate_id: str, action: str, success_info: str = "") -> None: """Log user access events (login/logout)""" self.log_event( user_id=user_id, mandate_id=mandate_id, category="access", action=action, details=success_info ) def log_data_access(self, user_id: str, mandate_id: str, action: str, details: str = "") -> None: """Log data access events""" self.log_event( user_id=user_id, mandate_id=mandate_id, category="data", action=action, details=details ) def log_security_event(self, user_id: str, mandate_id: str, action: str, details: str = "") -> None: """Log security-related events""" self.log_event( user_id=user_id, mandate_id=mandate_id, category="security", action=action, details=details ) # Global audit logger instance audit_logger = AuditLogger()