gateway/modules/shared/auditLogger.py

202 lines
7.1 KiB
Python

"""
Audit Logging System for PowerOn Gateway
This module provides centralized audit logging functionality for security events,
user actions, and system access patterns.
"""
import logging
import os
from datetime import datetime
from typing import Optional, Dict, Any
from logging.handlers import RotatingFileHandler
from modules.shared.configuration import APP_CONFIG
class DailyRotatingFileHandler(RotatingFileHandler):
"""
A rotating file handler that automatically switches to a new file when the date changes.
The log file name includes the current date and switches at midnight.
"""
def __init__(self, logDir, filenamePrefix, maxBytes=10485760, backupCount=5, **kwargs):
self.logDir = logDir
self.filenamePrefix = filenamePrefix
self.currentDate = None
self.currentFile = None
# Initialize with today's file
self._updateFileIfNeeded()
# Call parent constructor with current file
super().__init__(self.currentFile, maxBytes=maxBytes, backupCount=backupCount, **kwargs)
def _updateFileIfNeeded(self):
"""Update the log file if the date has changed"""
today = datetime.now().strftime("%Y%m%d")
if self.currentDate != today:
self.currentDate = today
newFile = os.path.join(self.logDir, f"{self.filenamePrefix}_{today}.log")
if self.currentFile != newFile:
self.currentFile = newFile
return True
return False
def emit(self, record):
"""Emit a log record, switching files if date has changed"""
# Check if we need to switch to a new file
if self._updateFileIfNeeded():
# Close current file and open new one
if self.stream:
self.stream.close()
self.stream = None
# Update the baseFilename for the parent class
self.baseFilename = self.currentFile
# Reopen the stream
if not self.delay:
self.stream = self._open()
# Call parent emit method
super().emit(record)
class AuditLogger:
"""Centralized audit logging system"""
def __init__(self):
self.logger = None
self._setupAuditLogger()
def _setupAuditLogger(self):
"""Setup the audit logger with daily file rotation"""
try:
# Get log directory from config
logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
if not os.path.isabs(logDir):
# If relative path, make it relative to the gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logDir = os.path.join(gatewayDir, logDir)
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
# Create audit logger
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
# Remove any existing handlers to avoid duplicates
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Create daily rotating file handler for audit log
rotationSize = int(APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)) # Default: 10MB
backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5))
fileHandler = DailyRotatingFileHandler(
logDir=logDir,
filenamePrefix="log_audit",
maxBytes=rotationSize,
backupCount=backupCount
)
# Create formatter for audit log
auditFormatter = logging.Formatter(
fmt="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(auditFormatter)
# Add handler to logger
self.logger.addHandler(fileHandler)
# Prevent propagation to root logger
self.logger.propagate = False
except Exception as e:
# Fallback to standard logger if audit setup fails
self.logger = logging.getLogger(__name__)
self.logger.error(f"Failed to setup audit logger: {str(e)}")
def logEvent(self,
userId: str,
mandateId: str,
category: str,
action: str,
details: str = "",
timestamp: Optional[datetime] = None) -> None:
"""
Log an audit event
Args:
userId: User identifier
mandateId: Mandate identifier (can be empty if not applicable)
category: Event category (e.g., 'key', 'access', 'data')
action: Specific action (e.g., 'decode', 'login', 'logout')
details: Additional details about the event
timestamp: Optional custom timestamp (defaults to current time)
"""
try:
if not self.logger:
return
# Use provided timestamp or current time
if timestamp is None:
timestamp = datetime.now()
# Format the audit log entry
# Format: timestamp | userid | mandateid | category | action | details
auditEntry = f"{userId} | {mandateId} | {category} | {action} | {details}"
# Log the event
self.logger.info(auditEntry)
except Exception as e:
# Use standard logger as fallback
logging.getLogger(__name__).error(f"Failed to log audit event: {str(e)}")
def logKeyAccess(self, userId: str, mandateId: str, keyName: str, action: str) -> None:
"""Log key access events (decode/encode)"""
self.logEvent(
userId=userId,
mandateId=mandateId,
category="key",
action=action,
details=keyName
)
def logUserAccess(self, userId: str, mandateId: str, action: str, successInfo: str = "") -> None:
"""Log user access events (login/logout)"""
self.logEvent(
userId=userId,
mandateId=mandateId,
category="access",
action=action,
details=successInfo
)
def logDataAccess(self, userId: str, mandateId: str, action: str, details: str = "") -> None:
"""Log data access events"""
self.logEvent(
userId=userId,
mandateId=mandateId,
category="data",
action=action,
details=details
)
def logSecurityEvent(self, userId: str, mandateId: str, action: str, details: str = "") -> None:
"""Log security-related events"""
self.logEvent(
userId=userId,
mandateId=mandateId,
category="security",
action=action,
details=details
)
# Global audit logger instance
audit_logger = AuditLogger()