gateway/modules/shared/configuration.py
2025-09-15 00:36:26 +02:00

232 lines
No EOL
8.1 KiB
Python

"""
Utility module for configuration management.
This module provides a global APP_CONFIG object for accessing configuration from both
config.ini files and environment variables stored in .env files, using a flat structure.
"""
import os
import logging
from typing import Any, Dict, Optional
from pathlib import Path
# Set up basic logging for configuration loading
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# Configure logger
logger = logging.getLogger(__name__)
class Configuration:
"""
Configuration class with attribute-style access to flattened configuration.
"""
def __init__(self):
"""Initialize the configuration object"""
self._data = {}
self._configFilePath = None
self._envFilePath = None
self._configMtime = 0
self._envMtime = 0
self.refresh()
def refresh(self):
"""Reload configuration from files"""
self._loadConfig()
self._loadEnv()
logger.info("Configuration refreshed")
def _loadConfig(self):
"""Load configuration from config.ini file in flattened format"""
# Find config.ini file in the gateway directory
configPath = Path(__file__).parent.parent.parent / 'config.ini'
if not configPath.exists():
logger.warning(f"Configuration file not found at {configPath.absolute()}")
return
self._configFilePath = configPath
currentMtime = os.path.getmtime(configPath)
# Skip if file hasn't changed
if currentMtime <= self._configMtime:
return
self._configMtime = currentMtime
try:
with open(configPath, 'r') as f:
lines = f.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
i += 1
continue
# Parse key-value pairs
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Check if value starts with { (JSON object)
if value.startswith('{'):
# Collect all lines until we find the closing }
json_lines = [value]
i += 1
brace_count = value.count('{') - value.count('}')
while i < len(lines) and brace_count > 0:
json_lines.append(lines[i].rstrip('\n'))
brace_count += lines[i].count('{') - lines[i].count('}')
i += 1
# Join all lines and parse as JSON
value = '\n'.join(json_lines)
i -= 1 # Adjust for the loop increment
# Add to data dictionary
self._data[key] = value
i += 1
except Exception as e:
logger.error(f"Error loading configuration: {e}")
def _loadEnv(self):
"""Load environment variables from .env file"""
# Find .env file in the gateway directory
envPath = Path(__file__).parent.parent.parent / '.env'
if not envPath.exists():
logger.warning(f"Environment file not found at {envPath.absolute()}")
return
self._envFilePath = envPath
currentMtime = os.path.getmtime(envPath)
# Skip if file hasn't changed
if currentMtime <= self._envMtime:
return
self._envMtime = currentMtime
try:
with open(envPath, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Parse key-value pairs
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Add directly to data dictionary
self._data[key] = value
logger.info(f"Loaded environment variables from {envPath.absolute()}")
# Also load system environment variables (don't override existing)
for key, value in os.environ.items():
if key not in self._data:
self._data[key] = value
except Exception as e:
logger.error(f"Error loading environment variables: {e}")
def checkForUpdates(self):
"""Check if configuration files have changed and reload if necessary"""
if self._configFilePath and os.path.exists(self._configFilePath):
currentMtime = os.path.getmtime(self._configFilePath)
if currentMtime > self._configMtime:
logger.info("Config file has changed, reloading...")
self._loadConfig()
if self._envFilePath and os.path.exists(self._envFilePath):
currentMtime = os.path.getmtime(self._envFilePath)
if currentMtime > self._envMtime:
logger.info("Environment file has changed, reloading...")
self._loadEnv()
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value with optional default"""
self.checkForUpdates() # Check for file changes
if key in self._data:
value = self._data[key]
# Handle secrets (keys ending with _SECRET)
if key.endswith("_SECRET"):
return handleSecret(value)
# Handle JSON secrets (keys ending with _API_KEY that contain JSON)
elif key.endswith("_API_KEY") and value.startswith("{"):
return handleJsonSecret(value)
return value
return default
def __getattr__(self, name: str) -> Any:
"""Enable attribute-style access to configuration"""
self.checkForUpdates() # Check for file changes
value = self.get(name)
if value is None:
raise AttributeError(f"Configuration key '{name}' not found")
return value
def __dir__(self) -> list:
"""Support auto-completion of attributes"""
self.checkForUpdates() # Check for file changes
return list(self._data.keys()) + super().__dir__()
def set(self, key: str, value: Any) -> None:
"""Set a configuration value (for testing/overrides)"""
self._data[key] = value
def handleSecret(value: str) -> str:
"""
Handle secret values. Currently just returns the plain text value,
but can be enhanced to provide actual decryption in the future.
Args:
value: The secret value to handle
Returns:
str: Processed secret value
"""
# For now, just return the value as-is
# In the future, this could be enhanced to decrypt values
return value
def handleJsonSecret(value: str) -> str:
"""
Handle JSON secret values (like Google service account keys).
Validates that the value is valid JSON.
Args:
value: The JSON secret value to handle
Returns:
str: Processed JSON secret value
Raises:
ValueError: If the value is not valid JSON
"""
import json
try:
# Validate that it's valid JSON
json.loads(value)
return value
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in secret value: {e}")
# Create the global APP_CONFIG instance
APP_CONFIG = Configuration()