423 lines
15 KiB
Python
423 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Tool for encrypting all *_SECRET variables in all environment files.
|
|
|
|
This tool automatically processes all three environment files (dev, int, prod)
|
|
and encrypts any unencrypted *_SECRET variables using the appropriate encryption
|
|
keys for each environment.
|
|
|
|
Usage:
|
|
# Encrypt all secrets in all environment files
|
|
python tool_security_encrypt_all_env_files.py
|
|
|
|
# Dry run - show what would be changed without making changes
|
|
python tool_security_encrypt_all_env_files.py --dry-run
|
|
|
|
# Skip backup creation
|
|
python tool_security_encrypt_all_env_files.py --no-backup
|
|
|
|
# Process only specific environment files
|
|
python tool_security_encrypt_all_env_files.py --files env_dev.env env_prod.env
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
|
|
# Add the modules directory to the Python path
|
|
current_dir = Path(__file__).parent
|
|
modules_dir = current_dir / 'modules'
|
|
if modules_dir.exists():
|
|
sys.path.insert(0, str(modules_dir))
|
|
else:
|
|
print(f"Error: Modules directory not found: {modules_dir}")
|
|
print(f"Make sure you're running this script from the gateway directory")
|
|
sys.exit(1)
|
|
|
|
# Import encryption functions
|
|
try:
|
|
from modules.shared.configuration import encryptValue
|
|
except ImportError as e:
|
|
print(f"Error: Could not import encryption functions from shared.configuration: {e}")
|
|
print(f"Make sure you're running this script from the gateway directory")
|
|
print(f"Modules directory: {modules_dir}")
|
|
sys.exit(1)
|
|
|
|
def get_env_type_from_file(file_path: Path) -> str:
|
|
"""
|
|
Read the APP_ENV_TYPE from the environment file.
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
|
|
Returns:
|
|
str: The environment type (dev, int, prod) or 'dev' as default
|
|
"""
|
|
if not file_path.exists():
|
|
return 'dev'
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith('APP_ENV_TYPE') and '=' in line:
|
|
_, value = line.split('=', 1)
|
|
return value.strip().lower()
|
|
except Exception as e:
|
|
print(f"Warning: Could not read APP_ENV_TYPE from {file_path}: {e}")
|
|
|
|
return 'dev'
|
|
|
|
def is_any_encrypted_value(value: str) -> bool:
|
|
"""
|
|
Check if a value has any encryption prefix (DEV_ENC:, INT_ENC:, PROD_ENC:, etc.).
|
|
|
|
Args:
|
|
value: The value to check
|
|
|
|
Returns:
|
|
bool: True if the value has any encryption prefix, False otherwise
|
|
"""
|
|
if not value or not isinstance(value, str):
|
|
return False
|
|
|
|
# Check for any environment-specific encryption prefixes
|
|
return (value.startswith('DEV_ENC:') or
|
|
value.startswith('INT_ENC:') or
|
|
value.startswith('PROD_ENC:') or
|
|
value.startswith('TEST_ENC:') or
|
|
value.startswith('STAGING_ENC:'))
|
|
|
|
def find_secret_keys_in_file(file_path: Path) -> list:
|
|
"""
|
|
Find all *_SECRET keys in an environment file that are not encrypted.
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
|
|
Returns:
|
|
list: List of tuples (line_number, key, value, full_line)
|
|
"""
|
|
secret_keys = []
|
|
|
|
if not file_path.exists():
|
|
return secret_keys
|
|
|
|
# Get the environment type from the file itself
|
|
file_env_type = get_env_type_from_file(file_path)
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
i += 1
|
|
continue
|
|
|
|
# Check if line contains a key-value pair
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Check if it's a secret key and not already encrypted with ANY prefix
|
|
if key.endswith('_SECRET') and value and not is_any_encrypted_value(value):
|
|
# Check if value starts with { (JSON object)
|
|
if value.startswith('{'):
|
|
# Collect all lines until we find the closing }
|
|
json_lines = [value]
|
|
start_line = i + 1
|
|
i += 1
|
|
brace_count = value.count('{') - value.count('}')
|
|
|
|
while i < len(lines) and brace_count > 0:
|
|
json_lines.append(lines[i].rstrip('\n'))
|
|
brace_count += lines[i].count('{') - lines[i].count('}')
|
|
i += 1
|
|
|
|
# Join all lines and create the full JSON value
|
|
full_json_value = '\n'.join(json_lines)
|
|
secret_keys.append((start_line, key, full_json_value, line))
|
|
i -= 1 # Adjust for the loop increment
|
|
else:
|
|
# Single line value
|
|
secret_keys.append((i + 1, key, value, line))
|
|
# Check if it's a secret key with multiline JSON (value is just "{")
|
|
elif key.endswith('_SECRET') and value == '{' and not is_any_encrypted_value(value):
|
|
# Collect all lines until we find the closing }
|
|
json_lines = [value]
|
|
start_line = i + 1
|
|
i += 1
|
|
brace_count = 1 # We already have one opening brace
|
|
|
|
while i < len(lines) and brace_count > 0:
|
|
json_lines.append(lines[i].rstrip('\n'))
|
|
brace_count += lines[i].count('{') - lines[i].count('}')
|
|
i += 1
|
|
|
|
# Join all lines and create the full JSON value
|
|
full_json_value = '\n'.join(json_lines)
|
|
secret_keys.append((start_line, key, full_json_value, line))
|
|
i -= 1 # Adjust for the loop increment
|
|
|
|
i += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
|
|
return secret_keys
|
|
|
|
def backup_file(file_path: Path) -> Path:
|
|
"""
|
|
Create a backup of the file before modification.
|
|
|
|
Args:
|
|
file_path: Path to the file to backup
|
|
|
|
Returns:
|
|
Path: Path to the backup file
|
|
"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = file_path.with_suffix(f'.{timestamp}.backup')
|
|
shutil.copy2(file_path, backup_path)
|
|
return backup_path
|
|
|
|
def encrypt_all_secrets_in_file(file_path: Path, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Encrypt all non-encrypted secrets in a file.
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
dry_run: If True, only show what would be changed
|
|
create_backup: If True, create a backup before modifying
|
|
|
|
Returns:
|
|
dict: Results of the encryption process
|
|
"""
|
|
# Get the environment type from the file itself
|
|
file_env_type = get_env_type_from_file(file_path)
|
|
|
|
results = {
|
|
'file': str(file_path),
|
|
'env_type': file_env_type,
|
|
'secrets_found': 0,
|
|
'secrets_encrypted': 0,
|
|
'errors': [],
|
|
'backup_created': None
|
|
}
|
|
|
|
# Find all secret keys
|
|
secret_keys = find_secret_keys_in_file(file_path)
|
|
results['secrets_found'] = len(secret_keys)
|
|
|
|
if not secret_keys:
|
|
print(f" ✅ No unencrypted secrets found - all values already have encryption prefixes")
|
|
return results
|
|
|
|
print(f" Found {len(secret_keys)} non-encrypted secrets")
|
|
|
|
if dry_run:
|
|
print(" [DRY RUN] Would encrypt the following secrets:")
|
|
for line_num, key, value, full_line in secret_keys:
|
|
print(f" Line {line_num}: {key} = {value[:50]}{'...' if len(value) > 50 else ''}")
|
|
return results
|
|
|
|
# Create backup if requested
|
|
if create_backup:
|
|
try:
|
|
backup_path = backup_file(file_path)
|
|
results['backup_created'] = str(backup_path)
|
|
print(f" 📋 Backup created: {backup_path.name}")
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to create backup: {e}")
|
|
print(f" ⚠️ Warning: Could not create backup: {e}")
|
|
|
|
# Read the file content
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to read file: {e}")
|
|
return results
|
|
|
|
# Process each secret key
|
|
for line_num, key, value, full_line in secret_keys:
|
|
try:
|
|
print(f" 🔐 Encrypting {key}...")
|
|
|
|
# Encrypt the value using the environment type from the file
|
|
encrypted_value = encryptValue(value, file_env_type)
|
|
|
|
# Replace the line in the file content
|
|
new_line = f"{key} = {encrypted_value}\n"
|
|
lines[line_num - 1] = new_line
|
|
|
|
# If this was a multiline JSON, we need to remove the remaining lines
|
|
if value.startswith('{') and '\n' in value:
|
|
# Count how many lines the original JSON spanned
|
|
json_lines = value.split('\n')
|
|
lines_to_remove = len(json_lines) - 1 # -1 because we already replaced the first line
|
|
|
|
# Remove the remaining lines
|
|
for i in range(line_num, line_num + lines_to_remove):
|
|
if i < len(lines):
|
|
lines[i] = ""
|
|
|
|
results['secrets_encrypted'] += 1
|
|
print(f" ✓ Encrypted successfully")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to encrypt {key}: {e}"
|
|
results['errors'].append(error_msg)
|
|
print(f" ✗ {error_msg}")
|
|
|
|
# Write the modified content back to the file
|
|
if results['secrets_encrypted'] > 0:
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.writelines(lines)
|
|
print(f" 💾 File updated successfully")
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to write file: {e}")
|
|
print(f" ✗ Failed to write file: {e}")
|
|
|
|
return results
|
|
|
|
def process_all_env_files(env_files: List[str] = None, dry_run: bool = False, create_backup: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Process all environment files and encrypt unencrypted secrets.
|
|
|
|
Args:
|
|
env_files: List of specific files to process (if None, processes all three default files)
|
|
dry_run: If True, only show what would be changed
|
|
create_backup: If True, create backups before modifying
|
|
|
|
Returns:
|
|
dict: Summary of all processing results
|
|
"""
|
|
# Default environment files if none specified
|
|
if env_files is None:
|
|
env_files = ['env_dev.env', 'env_int.env', 'env_prod.env']
|
|
|
|
# Convert to Path objects and check if they exist
|
|
env_paths = []
|
|
for env_file in env_files:
|
|
env_path = Path(env_file)
|
|
if not env_path.exists():
|
|
print(f"⚠️ Warning: Environment file not found: {env_file}")
|
|
continue
|
|
env_paths.append(env_path)
|
|
|
|
if not env_paths:
|
|
print("❌ No valid environment files found to process")
|
|
return {'total_files': 0, 'total_secrets_found': 0, 'total_secrets_encrypted': 0, 'total_errors': 0, 'files': []}
|
|
|
|
print("🔐 PowerOn Batch Secret Encryption Tool")
|
|
print("=" * 60)
|
|
print("⚠️ IMPORTANT: The tool will read APP_ENV_TYPE from each file itself")
|
|
print("⚠️ Each file will be processed with its own environment-specific encryption")
|
|
print()
|
|
|
|
if dry_run:
|
|
print("🔍 DRY RUN MODE - No changes will be made")
|
|
print()
|
|
|
|
# Process each file
|
|
all_results = []
|
|
total_secrets_found = 0
|
|
total_secrets_encrypted = 0
|
|
total_errors = 0
|
|
|
|
for env_path in env_paths:
|
|
print(f"\n📁 Processing {env_path.name}:")
|
|
results = encrypt_all_secrets_in_file(env_path, dry_run, create_backup)
|
|
all_results.append(results)
|
|
|
|
total_secrets_found += results['secrets_found']
|
|
total_secrets_encrypted += results['secrets_encrypted']
|
|
total_errors += len(results['errors'])
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("📊 SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Files processed: {len(env_paths)}")
|
|
print(f"Total secrets found: {total_secrets_found}")
|
|
|
|
if not dry_run:
|
|
print(f"Total secrets encrypted: {total_secrets_encrypted}")
|
|
print(f"Total errors: {total_errors}")
|
|
|
|
if total_errors == 0 and total_secrets_encrypted > 0:
|
|
print("\n🎉 All secrets encrypted successfully!")
|
|
elif total_errors > 0:
|
|
print(f"\n⚠️ Completed with {total_errors} errors")
|
|
else:
|
|
print("\n✅ No secrets needed encryption")
|
|
else:
|
|
print(f"Secrets that would be encrypted: {total_secrets_found}")
|
|
|
|
# Show backup information
|
|
backups_created = [r['backup_created'] for r in all_results if r['backup_created']]
|
|
if backups_created:
|
|
print(f"\n📋 Backups created: {len(backups_created)}")
|
|
for backup in backups_created:
|
|
print(f" - {Path(backup).name}")
|
|
|
|
# Show errors if any
|
|
all_errors = []
|
|
for results in all_results:
|
|
all_errors.extend(results['errors'])
|
|
|
|
if all_errors:
|
|
print(f"\n❌ Errors encountered:")
|
|
for error in all_errors:
|
|
print(f" - {error}")
|
|
|
|
return {
|
|
'total_files': len(env_paths),
|
|
'total_secrets_found': total_secrets_found,
|
|
'total_secrets_encrypted': total_secrets_encrypted,
|
|
'total_errors': total_errors,
|
|
'files': all_results
|
|
}
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Encrypt all *_SECRET variables in all environment files')
|
|
parser.add_argument('--files', '-f', nargs='+',
|
|
help='Specific environment files to process (default: all three env files)')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be changed without making changes')
|
|
parser.add_argument('--no-backup', action='store_true',
|
|
help='Skip creating backup files')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
results = process_all_env_files(
|
|
env_files=args.files,
|
|
dry_run=args.dry_run,
|
|
create_backup=not args.no_backup
|
|
)
|
|
|
|
# Return appropriate exit code
|
|
if results['total_errors'] > 0:
|
|
return 1
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 1
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|