#!/usr/bin/env python3 # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Tool for encrypting all *_SECRET variables in all environment files. This tool automatically processes all three environment files (dev, int, prod) and encrypts any unencrypted *_SECRET variables using the appropriate encryption keys for each environment. Usage: # Encrypt all secrets in all environment files python script_security_encrypt_all_env_files.py # Dry run - show what would be changed without making changes python script_security_encrypt_all_env_files.py --dry-run # Skip backup creation python script_security_encrypt_all_env_files.py --no-backup # Process only specific environment files python script_security_encrypt_all_env_files.py --files env_dev.env env_prod.env """ import sys import argparse import shutil from pathlib import Path from datetime import datetime from typing import List, Dict, Any # Add the gateway directory to the Python path scriptPath = Path(__file__).resolve() gatewayPath = scriptPath.parent.parent modulesDir = gatewayPath / "modules" if modulesDir.exists(): sys.path.insert(0, str(gatewayPath)) else: print(f"Error: Modules directory not found: {modulesDir}") print("Make sure you're running this script from the gateway directory") sys.exit(1) # Import encryption functions try: from modules.shared.configuration import encryptValue except ImportError as e: print(f"Error: Could not import encryption functions from shared.configuration: {e}") print(f"Make sure you're running this script from the gateway directory") print(f"Modules directory: {modulesDir}") sys.exit(1) def get_env_type_from_file(file_path: Path) -> str: """ Read the APP_ENV_TYPE from the environment file. Args: file_path: Path to the environment file Returns: str: The environment type (dev, int, prod) or 'dev' as default """ if not file_path.exists(): return 'dev' try: with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line.startswith('APP_ENV_TYPE') and '=' in line: _, value = line.split('=', 1) return value.strip().lower() except Exception as e: print(f"Warning: Could not read APP_ENV_TYPE from {file_path}: {e}") return 'dev' def is_any_encrypted_value(value: str) -> bool: """ Check if a value has any encryption prefix (DEV_ENC:, INT_ENC:, PROD_ENC:, etc.). Args: value: The value to check Returns: bool: True if the value has any encryption prefix, False otherwise """ if not value or not isinstance(value, str): return False # Check for any environment-specific encryption prefixes return (value.startswith('DEV_ENC:') or value.startswith('INT_ENC:') or value.startswith('PROD_ENC:') or value.startswith('TEST_ENC:') or value.startswith('STAGING_ENC:')) def find_secret_keys_in_file(file_path: Path) -> list: """ Find all *_SECRET keys in an environment file that are not encrypted. Args: file_path: Path to the environment file Returns: list: List of tuples (line_number, key, value, full_line) """ secret_keys = [] if not file_path.exists(): return secret_keys # Get the environment type from the file itself file_env_type = get_env_type_from_file(file_path) try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i].strip() # Skip empty lines and comments if not line or line.startswith('#'): i += 1 continue # Check if line contains a key-value pair if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Check if it's a secret key and not already encrypted with ANY prefix if key.endswith('_SECRET') and value and not is_any_encrypted_value(value): # Check if value starts with { (JSON object) if value.startswith('{'): # Collect all lines until we find the closing } json_lines = [value] start_line = i + 1 i += 1 brace_count = value.count('{') - value.count('}') while i < len(lines) and brace_count > 0: json_lines.append(lines[i].rstrip('\n')) brace_count += lines[i].count('{') - lines[i].count('}') i += 1 # Join all lines and create the full JSON value full_json_value = '\n'.join(json_lines) secret_keys.append((start_line, key, full_json_value, line)) i -= 1 # Adjust for the loop increment else: # Single line value secret_keys.append((i + 1, key, value, line)) # Check if it's a secret key with multiline JSON (value is just "{") elif key.endswith('_SECRET') and value == '{' and not is_any_encrypted_value(value): # Collect all lines until we find the closing } json_lines = [value] start_line = i + 1 i += 1 brace_count = 1 # We already have one opening brace while i < len(lines) and brace_count > 0: json_lines.append(lines[i].rstrip('\n')) brace_count += lines[i].count('{') - lines[i].count('}') i += 1 # Join all lines and create the full JSON value full_json_value = '\n'.join(json_lines) secret_keys.append((start_line, key, full_json_value, line)) i -= 1 # Adjust for the loop increment i += 1 except Exception as e: print(f"Error reading {file_path}: {e}") return secret_keys def backup_file(file_path: Path) -> Path: """ Create a backup of the file before modification. Args: file_path: Path to the file to backup Returns: Path: Path to the backup file """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = file_path.with_suffix(f'.{timestamp}.backup') shutil.copy2(file_path, backup_path) return backup_path def encrypt_all_secrets_in_file(file_path: Path, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]: """ Encrypt all non-encrypted secrets in a file. Args: file_path: Path to the environment file dry_run: If True, only show what would be changed create_backup: If True, create a backup before modifying Returns: dict: Results of the encryption process """ # Get the environment type from the file itself file_env_type = get_env_type_from_file(file_path) results = { 'file': str(file_path), 'env_type': file_env_type, 'secrets_found': 0, 'secrets_encrypted': 0, 'errors': [], 'backup_created': None } # Find all secret keys secret_keys = find_secret_keys_in_file(file_path) results['secrets_found'] = len(secret_keys) if not secret_keys: print(f" āœ… No unencrypted secrets found - all values already have encryption prefixes") return results print(f" Found {len(secret_keys)} non-encrypted secrets") if dry_run: print(" [DRY RUN] Would encrypt the following secrets:") for line_num, key, value, full_line in secret_keys: print(f" Line {line_num}: {key} = {value[:50]}{'...' if len(value) > 50 else ''}") return results # Create backup if requested if create_backup: try: backup_path = backup_file(file_path) results['backup_created'] = str(backup_path) print(f" šŸ“‹ Backup created: {backup_path.name}") except Exception as e: results['errors'].append(f"Failed to create backup: {e}") print(f" āš ļø Warning: Could not create backup: {e}") # Read the file content try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() except Exception as e: results['errors'].append(f"Failed to read file: {e}") return results # Process each secret key for line_num, key, value, full_line in secret_keys: try: print(f" šŸ” Encrypting {key}...") # Encrypt the value using the environment type from the file encrypted_value = encryptValue(value, file_env_type) # Replace the line in the file content new_line = f"{key} = {encrypted_value}\n" lines[line_num - 1] = new_line # If this was a multiline JSON, we need to remove the remaining lines if value.startswith('{') and '\n' in value: # Count how many lines the original JSON spanned json_lines = value.split('\n') lines_to_remove = len(json_lines) - 1 # -1 because we already replaced the first line # Remove the remaining lines for i in range(line_num, line_num + lines_to_remove): if i < len(lines): lines[i] = "" results['secrets_encrypted'] += 1 print(f" āœ“ Encrypted successfully") except Exception as e: error_msg = f"Failed to encrypt {key}: {e}" results['errors'].append(error_msg) print(f" āœ— {error_msg}") # Write the modified content back to the file if results['secrets_encrypted'] > 0: try: with open(file_path, 'w', encoding='utf-8') as f: f.writelines(lines) print(f" šŸ’¾ File updated successfully") except Exception as e: results['errors'].append(f"Failed to write file: {e}") print(f" āœ— Failed to write file: {e}") return results def process_all_env_files(env_files: List[str] = None, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]: """ Process all environment files and encrypt unencrypted secrets. Args: env_files: List of specific files to process (if None, processes all three default files) dry_run: If True, only show what would be changed create_backup: If True, create backups before modifying Returns: dict: Summary of all processing results """ # Default environment files if none specified if env_files is None: env_files = ['env_dev.env', 'env_int.env', 'env_prod.env'] # Convert to Path objects and check if they exist env_paths = [] for env_file in env_files: env_path = Path(env_file) if not env_path.exists(): print(f"āš ļø Warning: Environment file not found: {env_file}") continue env_paths.append(env_path) if not env_paths: print("āŒ No valid environment files found to process") return {'total_files': 0, 'total_secrets_found': 0, 'total_secrets_encrypted': 0, 'total_errors': 0, 'files': []} print("šŸ” PowerOn Batch Secret Encryption Tool") print("=" * 60) print("āš ļø IMPORTANT: The tool will read APP_ENV_TYPE from each file itself") print("āš ļø Each file will be processed with its own environment-specific encryption") print() if dry_run: print("šŸ” DRY RUN MODE - No changes will be made") print() # Process each file all_results = [] total_secrets_found = 0 total_secrets_encrypted = 0 total_errors = 0 for env_path in env_paths: print(f"\nšŸ“ Processing {env_path.name}:") results = encrypt_all_secrets_in_file(env_path, dry_run, create_backup) all_results.append(results) total_secrets_found += results['secrets_found'] total_secrets_encrypted += results['secrets_encrypted'] total_errors += len(results['errors']) # Summary print("\n" + "=" * 60) print("šŸ“Š SUMMARY") print("=" * 60) print(f"Files processed: {len(env_paths)}") print(f"Total secrets found: {total_secrets_found}") if not dry_run: print(f"Total secrets encrypted: {total_secrets_encrypted}") print(f"Total errors: {total_errors}") if total_errors == 0 and total_secrets_encrypted > 0: print("\nšŸŽ‰ All secrets encrypted successfully!") elif total_errors > 0: print(f"\nāš ļø Completed with {total_errors} errors") else: print("\nāœ… No secrets needed encryption") else: print(f"Secrets that would be encrypted: {total_secrets_found}") # Show backup information backups_created = [r['backup_created'] for r in all_results if r['backup_created']] if backups_created: print(f"\nšŸ“‹ Backups created: {len(backups_created)}") for backup in backups_created: print(f" - {Path(backup).name}") # Show errors if any all_errors = [] for results in all_results: all_errors.extend(results['errors']) if all_errors: print(f"\nāŒ Errors encountered:") for error in all_errors: print(f" - {error}") return { 'total_files': len(env_paths), 'total_secrets_found': total_secrets_found, 'total_secrets_encrypted': total_secrets_encrypted, 'total_errors': total_errors, 'files': all_results } def main(): parser = argparse.ArgumentParser(description='Encrypt all *_SECRET variables in all environment files') parser.add_argument('--files', '-f', nargs='+', help='Specific environment files to process (default: all three env files)') parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without making changes') parser.add_argument('--no-backup', action='store_true', help='Skip creating backup files') args = parser.parse_args() try: results = process_all_env_files( env_files=args.files, dry_run=args.dry_run, create_backup=not args.no_backup ) # Return appropriate exit code if results['total_errors'] > 0: return 1 return 0 except Exception as e: print(f"Error: {e}") return 1 if __name__ == '__main__': sys.exit(main())