184 lines
No EOL
6.4 KiB
Python
184 lines
No EOL
6.4 KiB
Python
"""
|
|
Utility module for configuration management.
|
|
|
|
This module provides a global APP_CONFIG object for accessing configuration from both
|
|
config.ini files and environment variables stored in .env files, using a flat structure.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Any, Dict, Optional
|
|
from pathlib import Path
|
|
|
|
# Set up basic logging for configuration loading
|
|
logging.basicConfig(
|
|
level=logging.WARNING,
|
|
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
|
|
handlers=[logging.StreamHandler()]
|
|
)
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Configuration:
|
|
"""
|
|
Configuration class with attribute-style access to flattened configuration.
|
|
"""
|
|
def __init__(self):
|
|
"""Initialize the configuration object"""
|
|
self._data = {}
|
|
self._configFilePath = None
|
|
self._envFilePath = None
|
|
self._configMtime = 0
|
|
self._envMtime = 0
|
|
self.refresh()
|
|
|
|
def refresh(self):
|
|
"""Reload configuration from files"""
|
|
self._loadConfig()
|
|
self._loadEnv()
|
|
logger.info("Configuration refreshed")
|
|
|
|
def _loadConfig(self):
|
|
"""Load configuration from config.ini file in flattened format"""
|
|
# Find config.ini file in the gateway directory
|
|
configPath = Path(__file__).parent.parent.parent / 'config.ini'
|
|
if not configPath.exists():
|
|
logger.warning(f"Configuration file not found at {configPath.absolute()}")
|
|
return
|
|
|
|
self._configFilePath = configPath
|
|
currentMtime = os.path.getmtime(configPath)
|
|
|
|
# Skip if file hasn't changed
|
|
if currentMtime <= self._configMtime:
|
|
return
|
|
|
|
self._configMtime = currentMtime
|
|
|
|
try:
|
|
with open(configPath, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Parse key-value pairs
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Add directly to data dictionary
|
|
self._data[key] = value
|
|
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading configuration: {e}")
|
|
|
|
def _loadEnv(self):
|
|
"""Load environment variables from .env file"""
|
|
# Find .env file in the gateway directory
|
|
envPath = Path(__file__).parent.parent.parent / '.env'
|
|
if not envPath.exists():
|
|
logger.warning(f"Environment file not found at {envPath.absolute()}")
|
|
return
|
|
|
|
self._envFilePath = envPath
|
|
currentMtime = os.path.getmtime(envPath)
|
|
|
|
# Skip if file hasn't changed
|
|
if currentMtime <= self._envMtime:
|
|
return
|
|
|
|
self._envMtime = currentMtime
|
|
|
|
try:
|
|
with open(envPath, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Parse key-value pairs
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Add directly to data dictionary
|
|
self._data[key] = value
|
|
|
|
logger.info(f"Loaded environment variables from {envPath.absolute()}")
|
|
|
|
# Also load system environment variables (don't override existing)
|
|
for key, value in os.environ.items():
|
|
if key not in self._data:
|
|
self._data[key] = value
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading environment variables: {e}")
|
|
|
|
def checkForUpdates(self):
|
|
"""Check if configuration files have changed and reload if necessary"""
|
|
if self._configFilePath and os.path.exists(self._configFilePath):
|
|
currentMtime = os.path.getmtime(self._configFilePath)
|
|
if currentMtime > self._configMtime:
|
|
logger.info("Config file has changed, reloading...")
|
|
self._loadConfig()
|
|
|
|
if self._envFilePath and os.path.exists(self._envFilePath):
|
|
currentMtime = os.path.getmtime(self._envFilePath)
|
|
if currentMtime > self._envMtime:
|
|
logger.info("Environment file has changed, reloading...")
|
|
self._loadEnv()
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
"""Get configuration value with optional default"""
|
|
self.checkForUpdates() # Check for file changes
|
|
|
|
if key in self._data:
|
|
value = self._data[key]
|
|
# Handle secrets (keys ending with _SECRET)
|
|
if key.endswith("_SECRET"):
|
|
return handleSecret(value)
|
|
return value
|
|
return default
|
|
|
|
def __getattr__(self, name: str) -> Any:
|
|
"""Enable attribute-style access to configuration"""
|
|
self.checkForUpdates() # Check for file changes
|
|
|
|
value = self.get(name)
|
|
if value is None:
|
|
raise AttributeError(f"Configuration key '{name}' not found")
|
|
return value
|
|
|
|
def __dir__(self) -> list:
|
|
"""Support auto-completion of attributes"""
|
|
self.checkForUpdates() # Check for file changes
|
|
return list(self._data.keys()) + super().__dir__()
|
|
|
|
def set(self, key: str, value: Any) -> None:
|
|
"""Set a configuration value (for testing/overrides)"""
|
|
self._data[key] = value
|
|
|
|
def handleSecret(value: str) -> str:
|
|
"""
|
|
Handle secret values. Currently just returns the plain text value,
|
|
but can be enhanced to provide actual decryption in the future.
|
|
|
|
Args:
|
|
value: The secret value to handle
|
|
|
|
Returns:
|
|
str: Processed secret value
|
|
"""
|
|
# For now, just return the value as-is
|
|
# In the future, this could be enhanced to decrypt values
|
|
return value
|
|
|
|
# Create the global APP_CONFIG instance
|
|
APP_CONFIG = Configuration() |