""" Utility module for configuration management. This module provides a global APP_CONFIG object for accessing configuration from both config.ini files and environment variables stored in .env files, using a flat structure. """ import os import logging import json import base64 import time from typing import Any, Dict, Optional from pathlib import Path from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # audit_logger imported lazily to avoid circular import # Set up basic logging for configuration loading logging.basicConfig( level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', handlers=[logging.StreamHandler()] ) # Configure logger logger = logging.getLogger(__name__) class Configuration: """ Configuration class with attribute-style access to flattened configuration. """ def __init__(self): """Initialize the configuration object""" self._data = {} self._configFilePath = None self._envFilePath = None self._configMtime = 0 self._envMtime = 0 self.refresh() def refresh(self): """Reload configuration from files""" self._loadConfig() self._loadEnv() logger.info("Configuration refreshed") def _loadConfig(self): """Load configuration from config.ini file in flattened format""" # Find config.ini file in the gateway directory configPath = Path(__file__).parent.parent.parent / 'config.ini' if not configPath.exists(): logger.warning(f"Configuration file not found at {configPath.absolute()}") return self._configFilePath = configPath currentMtime = os.path.getmtime(configPath) # Skip if file hasn't changed if currentMtime <= self._configMtime: return self._configMtime = currentMtime try: with open(configPath, 'r') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i].strip() # Skip empty lines and comments if not line or line.startswith('#'): i += 1 continue # Parse key-value pairs if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Check if value starts with { (JSON object) if value.startswith('{'): # Collect all lines until we find the closing } json_lines = [value] i += 1 brace_count = value.count('{') - value.count('}') while i < len(lines) and brace_count > 0: json_lines.append(lines[i].rstrip('\n')) brace_count += lines[i].count('{') - lines[i].count('}') i += 1 # Join all lines and parse as JSON value = '\n'.join(json_lines) i -= 1 # Adjust for the loop increment # Add to data dictionary self._data[key] = value i += 1 except Exception as e: logger.error(f"Error loading configuration: {e}") def _loadEnv(self): """Load environment variables from .env file""" # Find .env file in the gateway directory envPath = Path(__file__).parent.parent.parent / '.env' if not envPath.exists(): logger.warning(f"Environment file not found at {envPath.absolute()}") return self._envFilePath = envPath currentMtime = os.path.getmtime(envPath) # Skip if file hasn't changed if currentMtime <= self._envMtime: return self._envMtime = currentMtime try: with open(envPath, 'r') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i].strip() # Skip empty lines and comments if not line or line.startswith('#'): i += 1 continue # Parse key-value pairs if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Check if value starts with { (JSON object) if value.startswith('{'): # Collect all lines until we find the closing } json_lines = [value] i += 1 brace_count = value.count('{') - value.count('}') while i < len(lines) and brace_count > 0: json_lines.append(lines[i].rstrip('\n')) brace_count += lines[i].count('{') - lines[i].count('}') i += 1 # Join all lines and create the full JSON value full_json_value = '\n'.join(json_lines) self._data[key] = full_json_value else: # Single line value self._data[key] = value i += 1 logger.info(f"Loaded environment variables from {envPath.absolute()}") # Also load system environment variables (don't override existing) for key, value in os.environ.items(): if key not in self._data: self._data[key] = value except Exception as e: logger.error(f"Error loading environment variables: {e}") def checkForUpdates(self): """Check if configuration files have changed and reload if necessary""" if self._configFilePath and os.path.exists(self._configFilePath): currentMtime = os.path.getmtime(self._configFilePath) if currentMtime > self._configMtime: logger.info("Config file has changed, reloading...") self._loadConfig() if self._envFilePath and os.path.exists(self._envFilePath): currentMtime = os.path.getmtime(self._envFilePath) if currentMtime > self._envMtime: logger.info("Environment file has changed, reloading...") self._loadEnv() def get(self, key: str, default: Any = None, user_id: str = "system") -> Any: """Get configuration value with optional default""" self.checkForUpdates() # Check for file changes if key in self._data: value = self._data[key] # Handle secrets (keys ending with _SECRET) if key.endswith("_SECRET"): # Log audit event for secret key access try: from modules.shared.auditLogger import audit_logger audit_logger.log_key_access( user_id=user_id, mandate_id="system", key_name=key, action="decode" ) except Exception: # Don't fail if audit logging fails pass if value.startswith("{") and value.endswith("}"): # Handle JSON secrets (keys ending with _API_KEY that contain JSON) return handleSecretJson(value, user_id, key) else: return handleSecretText(value, user_id, key) return value return default def __getattr__(self, name: str) -> Any: """Enable attribute-style access to configuration""" self.checkForUpdates() # Check for file changes value = self.get(name, user_id="system") if value is None: raise AttributeError(f"Configuration key '{name}' not found") return value def __dir__(self) -> list: """Support auto-completion of attributes""" self.checkForUpdates() # Check for file changes return list(self._data.keys()) + super().__dir__() def set(self, key: str, value: Any) -> None: """Set a configuration value (for testing/overrides)""" self._data[key] = value def handleSecretText(value: str, user_id: str = "system", key_name: str = "unknown") -> str: """ Handle secret values with encryption/decryption support. Args: value: The secret value to handle (may be encrypted) user_id: The user ID making the request (default: "system") key_name: The name of the key being decrypted (default: "unknown") Returns: str: Processed secret value (decrypted if encrypted) """ if _is_encrypted_value(value): return decrypt_value(value, user_id, key_name) return value def handleSecretJson(value: str, user_id: str = "system", key_name: str = "unknown") -> str: """ Handle JSON secret values (like Google service account keys) with encryption/decryption support. Validates that the value is valid JSON after decryption. Args: value: The JSON secret value to handle (may be encrypted) user_id: The user ID making the request (default: "system") key_name: The name of the key being decrypted (default: "unknown") Returns: str: Processed JSON secret value (decrypted if encrypted) Raises: ValueError: If the value is not valid JSON after decryption """ # Decrypt if encrypted if _is_encrypted_value(value): decrypted_value = decrypt_value(value, user_id, key_name) else: decrypted_value = value try: # Validate that it's valid JSON json.loads(decrypted_value) return decrypted_value except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in secret value: {e}") # Global rate limiting tracking # Structure: {user_id: {key_name: [timestamps]}} _decryption_attempts = {} def _get_master_key(env_type: str = None) -> bytes: """ Get the master key for the specified environment. Args: env_type: The environment type (dev, int, prod, etc.). If None, uses current config. Returns: bytes: The master key for encryption/decryption Raises: ValueError: If no master key is found """ # Get the key location from config key_location = APP_CONFIG.get('APP_KEY_SYSVAR') if env_type is None: env_type = APP_CONFIG.get('APP_ENV_TYPE', 'dev') if not key_location: raise ValueError("APP_KEY_SYSVAR not configured") # First try to get from environment variable master_key = os.environ.get(key_location) if master_key: # If found in environment, use it directly return master_key.encode('utf-8') # If not in environment, try to read from file if os.path.exists(key_location): try: with open(key_location, 'r') as f: content = f.read().strip() # Parse the key file format: env = key lines = content.split('\n') for line in lines: line = line.strip() if not line or line.startswith('#'): continue if '=' in line: key_env, key_value = line.split('=', 1) key_env = key_env.strip() key_value = key_value.strip() if key_env == env_type: return key_value.encode('utf-8') raise ValueError(f"No key found for environment '{env_type}' in {key_location}") except Exception as e: raise ValueError(f"Error reading key file {key_location}: {e}") raise ValueError(f"Master key not found. Checked environment variable '{key_location}' and file path") def _derive_encryption_key(master_key: bytes) -> bytes: """ Derive a 32-byte encryption key from the master key using PBKDF2. Args: master_key: The master key bytes Returns: bytes: 32-byte derived key suitable for Fernet """ # Use a fixed salt for consistency (in production, consider using a random salt stored separately) salt = b'poweron_config_salt_2025' kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) return base64.urlsafe_b64encode(kdf.derive(master_key)) def _is_encrypted_value(value: str) -> bool: """ Check if a value is encrypted (starts with environment-specific prefix). Args: value: The value to check Returns: bool: True if encrypted, False otherwise """ if not value or not isinstance(value, str): return False # Check for any environment-specific encryption prefixes return (value.startswith('DEV_ENC:') or value.startswith('INT_ENC:') or value.startswith('PROD_ENC:') or value.startswith('TEST_ENC:') or value.startswith('STAGING_ENC:')) def _get_encryption_prefix(env_type: str) -> str: """ Get the encryption prefix for the given environment type. Args: env_type: The environment type (dev, int, prod, etc.) Returns: str: The encryption prefix """ return f"{env_type.upper()}_ENC:" def _check_decryption_rate_limit(user_id: str, key_name: str, max_per_second: int = 10) -> bool: """ Check if decryption is allowed based on rate limiting (max 10 per second per user per key). Args: user_id: The user ID making the request key_name: The name of the key being decrypted max_per_second: Maximum decryptions per second (default: 10) Returns: bool: True if allowed, False if rate limited """ current_time = time.time() # Initialize tracking for this user if not exists if user_id not in _decryption_attempts: _decryption_attempts[user_id] = {} # Initialize tracking for this key if not exists if key_name not in _decryption_attempts[user_id]: _decryption_attempts[user_id][key_name] = [] # Clean old attempts (older than 1 second) _decryption_attempts[user_id][key_name] = [ timestamp for timestamp in _decryption_attempts[user_id][key_name] if current_time - timestamp < 1.0 ] # Check if we're within rate limit if len(_decryption_attempts[user_id][key_name]) >= max_per_second: logger.warning(f"Decryption rate limit exceeded for user '{user_id}' key '{key_name}' ({max_per_second}/sec)") return False # Record this attempt _decryption_attempts[user_id][key_name].append(current_time) return True def encrypt_value(value: str, env_type: str = None, user_id: str = "system", key_name: str = "unknown") -> str: """ Encrypt a value using the master key for the specified environment. Args: value: The plain text value to encrypt env_type: The environment type (dev, int, prod). If None, uses current environment. user_id: The user ID making the request (default: "system") key_name: The name of the key being encrypted (default: "unknown") Returns: str: The encrypted value with prefix Raises: ValueError: If encryption fails """ if env_type is None: env_type = APP_CONFIG.get('APP_ENV_TYPE', 'dev') try: master_key = _get_master_key(env_type) derived_key = _derive_encryption_key(master_key) fernet = Fernet(derived_key) # Encrypt the value encrypted_bytes = fernet.encrypt(value.encode('utf-8')) encrypted_b64 = base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8') # Add environment prefix prefix = _get_encryption_prefix(env_type) encrypted_value = f"{prefix}{encrypted_b64}" # Log audit event for encryption try: from modules.shared.auditLogger import audit_logger audit_logger.log_key_access( user_id=user_id, mandate_id="system", key_name=key_name, action="encrypt" ) except Exception: # Don't fail if audit logging fails pass return encrypted_value except Exception as e: raise ValueError(f"Encryption failed: {e}") def decrypt_value(encrypted_value: str, user_id: str = "system", key_name: str = "unknown") -> str: """ Decrypt a value using the master key for the current environment. Args: encrypted_value: The encrypted value with prefix user_id: The user ID making the request (default: "system") key_name: The name of the key being decrypted (default: "unknown") Returns: str: The decrypted plain text value Raises: ValueError: If decryption fails """ if not _is_encrypted_value(encrypted_value): return encrypted_value # Return as-is if not encrypted # Check rate limiting (10 per second per user per key) if not _check_decryption_rate_limit(user_id, key_name, max_per_second=10): raise ValueError(f"Decryption rate limit exceeded for user '{user_id}' key '{key_name}' (10/sec)") try: # Extract environment type from prefix if encrypted_value.startswith('DEV_ENC:'): env_type = 'dev' prefix = 'DEV_ENC:' elif encrypted_value.startswith('INT_ENC:'): env_type = 'int' prefix = 'INT_ENC:' elif encrypted_value.startswith('PROD_ENC:'): env_type = 'prod' prefix = 'PROD_ENC:' elif encrypted_value.startswith('TEST_ENC:'): env_type = 'test' prefix = 'TEST_ENC:' elif encrypted_value.startswith('STAGING_ENC:'): env_type = 'staging' prefix = 'STAGING_ENC:' else: raise ValueError(f"Invalid encryption prefix. Expected DEV_ENC:, INT_ENC:, PROD_ENC:, TEST_ENC:, or STAGING_ENC:") encrypted_part = encrypted_value[len(prefix):] # Get master key for the specific environment and derive encryption key master_key = _get_master_key(env_type) derived_key = _derive_encryption_key(master_key) fernet = Fernet(derived_key) # Decode and decrypt encrypted_bytes = base64.urlsafe_b64decode(encrypted_part.encode('utf-8')) decrypted_bytes = fernet.decrypt(encrypted_bytes) decrypted_value = decrypted_bytes.decode('utf-8') # Log audit event for decryption try: from modules.shared.auditLogger import audit_logger audit_logger.log_key_access( user_id=user_id, mandate_id="system", key_name=key_name, action="decrypt" ) except Exception: # Don't fail if audit logging fails pass return decrypted_value except Exception as e: raise ValueError(f"Decryption failed: {e}") # Create the global APP_CONFIG instance APP_CONFIG = Configuration()