gateway/scripts/script_security_encrypt_all_env_files.py
2026-01-21 10:34:42 +01:00

424 lines
15 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Tool for encrypting all *_SECRET variables in all environment files.
This tool automatically processes all three environment files (dev, int, prod)
and encrypts any unencrypted *_SECRET variables using the appropriate encryption
keys for each environment.
Usage:
# Encrypt all secrets in all environment files
python script_security_encrypt_all_env_files.py
# Dry run - show what would be changed without making changes
python script_security_encrypt_all_env_files.py --dry-run
# Skip backup creation
python script_security_encrypt_all_env_files.py --no-backup
# Process only specific environment files
python script_security_encrypt_all_env_files.py --files env_dev.env env_prod.env
"""
import sys
import argparse
import shutil
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
# Add the gateway directory to the Python path
scriptPath = Path(__file__).resolve()
gatewayPath = scriptPath.parent.parent
modulesDir = gatewayPath / "modules"
if modulesDir.exists():
sys.path.insert(0, str(gatewayPath))
else:
print(f"Error: Modules directory not found: {modulesDir}")
print("Make sure you're running this script from the gateway directory")
sys.exit(1)
# Import encryption functions
try:
from modules.shared.configuration import encryptValue
except ImportError as e:
print(f"Error: Could not import encryption functions from shared.configuration: {e}")
print(f"Make sure you're running this script from the gateway directory")
print(f"Modules directory: {modulesDir}")
sys.exit(1)
def get_env_type_from_file(file_path: Path) -> str:
"""
Read the APP_ENV_TYPE from the environment file.
Args:
file_path: Path to the environment file
Returns:
str: The environment type (dev, int, prod) or 'dev' as default
"""
if not file_path.exists():
return 'dev'
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('APP_ENV_TYPE') and '=' in line:
_, value = line.split('=', 1)
return value.strip().lower()
except Exception as e:
print(f"Warning: Could not read APP_ENV_TYPE from {file_path}: {e}")
return 'dev'
def is_any_encrypted_value(value: str) -> bool:
"""
Check if a value has any encryption prefix (DEV_ENC:, INT_ENC:, PROD_ENC:, etc.).
Args:
value: The value to check
Returns:
bool: True if the value has any encryption prefix, False otherwise
"""
if not value or not isinstance(value, str):
return False
# Check for any environment-specific encryption prefixes
return (value.startswith('DEV_ENC:') or
value.startswith('INT_ENC:') or
value.startswith('PROD_ENC:') or
value.startswith('TEST_ENC:') or
value.startswith('STAGING_ENC:'))
def find_secret_keys_in_file(file_path: Path) -> list:
"""
Find all *_SECRET keys in an environment file that are not encrypted.
Args:
file_path: Path to the environment file
Returns:
list: List of tuples (line_number, key, value, full_line)
"""
secret_keys = []
if not file_path.exists():
return secret_keys
# Get the environment type from the file itself
file_env_type = get_env_type_from_file(file_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
i += 1
continue
# Check if line contains a key-value pair
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Check if it's a secret key and not already encrypted with ANY prefix
if key.endswith('_SECRET') and value and not is_any_encrypted_value(value):
# Check if value starts with { (JSON object)
if value.startswith('{'):
# Collect all lines until we find the closing }
json_lines = [value]
start_line = i + 1
i += 1
brace_count = value.count('{') - value.count('}')
while i < len(lines) and brace_count > 0:
json_lines.append(lines[i].rstrip('\n'))
brace_count += lines[i].count('{') - lines[i].count('}')
i += 1
# Join all lines and create the full JSON value
full_json_value = '\n'.join(json_lines)
secret_keys.append((start_line, key, full_json_value, line))
i -= 1 # Adjust for the loop increment
else:
# Single line value
secret_keys.append((i + 1, key, value, line))
# Check if it's a secret key with multiline JSON (value is just "{")
elif key.endswith('_SECRET') and value == '{' and not is_any_encrypted_value(value):
# Collect all lines until we find the closing }
json_lines = [value]
start_line = i + 1
i += 1
brace_count = 1 # We already have one opening brace
while i < len(lines) and brace_count > 0:
json_lines.append(lines[i].rstrip('\n'))
brace_count += lines[i].count('{') - lines[i].count('}')
i += 1
# Join all lines and create the full JSON value
full_json_value = '\n'.join(json_lines)
secret_keys.append((start_line, key, full_json_value, line))
i -= 1 # Adjust for the loop increment
i += 1
except Exception as e:
print(f"Error reading {file_path}: {e}")
return secret_keys
def backup_file(file_path: Path) -> Path:
"""
Create a backup of the file before modification.
Args:
file_path: Path to the file to backup
Returns:
Path: Path to the backup file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = file_path.with_suffix(f'.{timestamp}.backup')
shutil.copy2(file_path, backup_path)
return backup_path
def encrypt_all_secrets_in_file(file_path: Path, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]:
"""
Encrypt all non-encrypted secrets in a file.
Args:
file_path: Path to the environment file
dry_run: If True, only show what would be changed
create_backup: If True, create a backup before modifying
Returns:
dict: Results of the encryption process
"""
# Get the environment type from the file itself
file_env_type = get_env_type_from_file(file_path)
results = {
'file': str(file_path),
'env_type': file_env_type,
'secrets_found': 0,
'secrets_encrypted': 0,
'errors': [],
'backup_created': None
}
# Find all secret keys
secret_keys = find_secret_keys_in_file(file_path)
results['secrets_found'] = len(secret_keys)
if not secret_keys:
print(f" ✅ No unencrypted secrets found - all values already have encryption prefixes")
return results
print(f" Found {len(secret_keys)} non-encrypted secrets")
if dry_run:
print(" [DRY RUN] Would encrypt the following secrets:")
for line_num, key, value, full_line in secret_keys:
print(f" Line {line_num}: {key} = {value[:50]}{'...' if len(value) > 50 else ''}")
return results
# Create backup if requested
if create_backup:
try:
backup_path = backup_file(file_path)
results['backup_created'] = str(backup_path)
print(f" 📋 Backup created: {backup_path.name}")
except Exception as e:
results['errors'].append(f"Failed to create backup: {e}")
print(f" ⚠️ Warning: Could not create backup: {e}")
# Read the file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
except Exception as e:
results['errors'].append(f"Failed to read file: {e}")
return results
# Process each secret key
for line_num, key, value, full_line in secret_keys:
try:
print(f" 🔐 Encrypting {key}...")
# Encrypt the value using the environment type from the file
encrypted_value = encryptValue(value, file_env_type)
# Replace the line in the file content
new_line = f"{key} = {encrypted_value}\n"
lines[line_num - 1] = new_line
# If this was a multiline JSON, we need to remove the remaining lines
if value.startswith('{') and '\n' in value:
# Count how many lines the original JSON spanned
json_lines = value.split('\n')
lines_to_remove = len(json_lines) - 1 # -1 because we already replaced the first line
# Remove the remaining lines
for i in range(line_num, line_num + lines_to_remove):
if i < len(lines):
lines[i] = ""
results['secrets_encrypted'] += 1
print(f" ✓ Encrypted successfully")
except Exception as e:
error_msg = f"Failed to encrypt {key}: {e}"
results['errors'].append(error_msg)
print(f"{error_msg}")
# Write the modified content back to the file
if results['secrets_encrypted'] > 0:
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.writelines(lines)
print(f" 💾 File updated successfully")
except Exception as e:
results['errors'].append(f"Failed to write file: {e}")
print(f" ✗ Failed to write file: {e}")
return results
def process_all_env_files(env_files: List[str] = None, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]:
"""
Process all environment files and encrypt unencrypted secrets.
Args:
env_files: List of specific files to process (if None, processes all three default files)
dry_run: If True, only show what would be changed
create_backup: If True, create backups before modifying
Returns:
dict: Summary of all processing results
"""
# Default environment files if none specified
if env_files is None:
env_files = ['env_dev.env', 'env_int.env', 'env_prod.env']
# Convert to Path objects and check if they exist
env_paths = []
for env_file in env_files:
env_path = Path(env_file)
if not env_path.exists():
print(f"⚠️ Warning: Environment file not found: {env_file}")
continue
env_paths.append(env_path)
if not env_paths:
print("❌ No valid environment files found to process")
return {'total_files': 0, 'total_secrets_found': 0, 'total_secrets_encrypted': 0, 'total_errors': 0, 'files': []}
print("🔐 PowerOn Batch Secret Encryption Tool")
print("=" * 60)
print("⚠️ IMPORTANT: The tool will read APP_ENV_TYPE from each file itself")
print("⚠️ Each file will be processed with its own environment-specific encryption")
print()
if dry_run:
print("🔍 DRY RUN MODE - No changes will be made")
print()
# Process each file
all_results = []
total_secrets_found = 0
total_secrets_encrypted = 0
total_errors = 0
for env_path in env_paths:
print(f"\n📁 Processing {env_path.name}:")
results = encrypt_all_secrets_in_file(env_path, dry_run, create_backup)
all_results.append(results)
total_secrets_found += results['secrets_found']
total_secrets_encrypted += results['secrets_encrypted']
total_errors += len(results['errors'])
# Summary
print("\n" + "=" * 60)
print("📊 SUMMARY")
print("=" * 60)
print(f"Files processed: {len(env_paths)}")
print(f"Total secrets found: {total_secrets_found}")
if not dry_run:
print(f"Total secrets encrypted: {total_secrets_encrypted}")
print(f"Total errors: {total_errors}")
if total_errors == 0 and total_secrets_encrypted > 0:
print("\n🎉 All secrets encrypted successfully!")
elif total_errors > 0:
print(f"\n⚠️ Completed with {total_errors} errors")
else:
print("\n✅ No secrets needed encryption")
else:
print(f"Secrets that would be encrypted: {total_secrets_found}")
# Show backup information
backups_created = [r['backup_created'] for r in all_results if r['backup_created']]
if backups_created:
print(f"\n📋 Backups created: {len(backups_created)}")
for backup in backups_created:
print(f" - {Path(backup).name}")
# Show errors if any
all_errors = []
for results in all_results:
all_errors.extend(results['errors'])
if all_errors:
print(f"\n❌ Errors encountered:")
for error in all_errors:
print(f" - {error}")
return {
'total_files': len(env_paths),
'total_secrets_found': total_secrets_found,
'total_secrets_encrypted': total_secrets_encrypted,
'total_errors': total_errors,
'files': all_results
}
def main():
parser = argparse.ArgumentParser(description='Encrypt all *_SECRET variables in all environment files')
parser.add_argument('--files', '-f', nargs='+',
help='Specific environment files to process (default: all three env files)')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be changed without making changes')
parser.add_argument('--no-backup', action='store_true',
help='Skip creating backup files')
args = parser.parse_args()
try:
results = process_all_env_files(
env_files=args.files,
dry_run=args.dry_run,
create_backup=not args.no_backup
)
# Return appropriate exit code
if results['total_errors'] > 0:
return 1
return 0
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == '__main__':
sys.exit(main())