gateway/modules/shared/auditLogger.py
2025-09-22 00:39:15 +02:00

202 lines
7.2 KiB
Python

"""
Audit Logging System for PowerOn Gateway
This module provides centralized audit logging functionality for security events,
user actions, and system access patterns.
"""
import logging
import os
from datetime import datetime
from typing import Optional, Dict, Any
from logging.handlers import RotatingFileHandler
from modules.shared.configuration import APP_CONFIG
class DailyRotatingFileHandler(RotatingFileHandler):
"""
A rotating file handler that automatically switches to a new file when the date changes.
The log file name includes the current date and switches at midnight.
"""
def __init__(self, log_dir, filename_prefix, max_bytes=10485760, backup_count=5, **kwargs):
self.log_dir = log_dir
self.filename_prefix = filename_prefix
self.current_date = None
self.current_file = None
# Initialize with today's file
self._update_file_if_needed()
# Call parent constructor with current file
super().__init__(self.current_file, maxBytes=max_bytes, backupCount=backup_count, **kwargs)
def _update_file_if_needed(self):
"""Update the log file if the date has changed"""
today = datetime.now().strftime("%Y%m%d")
if self.current_date != today:
self.current_date = today
new_file = os.path.join(self.log_dir, f"{self.filename_prefix}_{today}.log")
if self.current_file != new_file:
self.current_file = new_file
return True
return False
def emit(self, record):
"""Emit a log record, switching files if date has changed"""
# Check if we need to switch to a new file
if self._update_file_if_needed():
# Close current file and open new one
if self.stream:
self.stream.close()
self.stream = None
# Update the baseFilename for the parent class
self.baseFilename = self.current_file
# Reopen the stream
if not self.delay:
self.stream = self._open()
# Call parent emit method
super().emit(record)
class AuditLogger:
"""Centralized audit logging system"""
def __init__(self):
self.logger = None
self._setup_audit_logger()
def _setup_audit_logger(self):
"""Setup the audit logger with daily file rotation"""
try:
# Get log directory from config
logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
if not os.path.isabs(logDir):
# If relative path, make it relative to the gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logDir = os.path.join(gatewayDir, logDir)
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
# Create audit logger
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
# Remove any existing handlers to avoid duplicates
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Create daily rotating file handler for audit log
rotationSize = int(APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)) # Default: 10MB
backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5))
fileHandler = DailyRotatingFileHandler(
log_dir=logDir,
filename_prefix="log_audit",
max_bytes=rotationSize,
backup_count=backupCount
)
# Create formatter for audit log
auditFormatter = logging.Formatter(
fmt="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(auditFormatter)
# Add handler to logger
self.logger.addHandler(fileHandler)
# Prevent propagation to root logger
self.logger.propagate = False
except Exception as e:
# Fallback to standard logger if audit setup fails
self.logger = logging.getLogger(__name__)
self.logger.error(f"Failed to setup audit logger: {str(e)}")
def log_event(self,
user_id: str,
mandate_id: str,
category: str,
action: str,
details: str = "",
timestamp: Optional[datetime] = None) -> None:
"""
Log an audit event
Args:
user_id: User identifier
mandate_id: Mandate identifier (can be empty if not applicable)
category: Event category (e.g., 'key', 'access', 'data')
action: Specific action (e.g., 'decode', 'login', 'logout')
details: Additional details about the event
timestamp: Optional custom timestamp (defaults to current time)
"""
try:
if not self.logger:
return
# Use provided timestamp or current time
if timestamp is None:
timestamp = datetime.now()
# Format the audit log entry
# Format: timestamp | userid | mandateid | category | action | details
audit_entry = f"{user_id} | {mandate_id} | {category} | {action} | {details}"
# Log the event
self.logger.info(audit_entry)
except Exception as e:
# Use standard logger as fallback
logging.getLogger(__name__).error(f"Failed to log audit event: {str(e)}")
def log_key_access(self, user_id: str, mandate_id: str, key_name: str, action: str) -> None:
"""Log key access events (decode/encode)"""
self.log_event(
user_id=user_id,
mandate_id=mandate_id,
category="key",
action=action,
details=key_name
)
def log_user_access(self, user_id: str, mandate_id: str, action: str, success_info: str = "") -> None:
"""Log user access events (login/logout)"""
self.log_event(
user_id=user_id,
mandate_id=mandate_id,
category="access",
action=action,
details=success_info
)
def log_data_access(self, user_id: str, mandate_id: str, action: str, details: str = "") -> None:
"""Log data access events"""
self.log_event(
user_id=user_id,
mandate_id=mandate_id,
category="data",
action=action,
details=details
)
def log_security_event(self, user_id: str, mandate_id: str, action: str, details: str = "") -> None:
"""Log security-related events"""
self.log_event(
user_id=user_id,
mandate_id=mandate_id,
category="security",
action=action,
details=details
)
# Global audit logger instance
audit_logger = AuditLogger()