522 lines
21 KiB
Python
522 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Tool for encrypting configuration values.
|
|
|
|
This tool allows developers to encrypt secret values for use in configuration files.
|
|
It supports both text and JSON values and automatically determines the environment.
|
|
It can also encrypt all *_SECRET keys in an environment file at once.
|
|
|
|
Usage:
|
|
# Encrypt a single value
|
|
python script_security_encrypt_config_value.py --value "my_secret_value" --env dev
|
|
python script_security_encrypt_config_value.py --file "path/to/file.json" --env prod
|
|
|
|
# Encrypt all secrets in a file
|
|
python script_security_encrypt_config_value.py --encrypt-all env_dev.env --env dev
|
|
python script_security_encrypt_config_value.py --encrypt-all env_prod.env --env prod --dry-run
|
|
|
|
# Decrypt a value (for testing)
|
|
python script_security_encrypt_config_value.py --decrypt "DEV_ENC:encrypted_value"
|
|
|
|
# Verify master key is correct
|
|
python script_security_encrypt_config_value.py --verify "PROD_ENC:Z0FBQUFBQm8xSU5p..."
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Add the gateway directory to the Python path
|
|
scriptPath = Path(__file__).resolve()
|
|
gatewayPath = scriptPath.parent.parent
|
|
projectRoot = gatewayPath.parent
|
|
sys.path.insert(0, str(gatewayPath))
|
|
|
|
from modules.shared.configuration import encryptValue, decryptValue, _isEncryptedValue as isEncryptedValue
|
|
|
|
def get_env_type_from_file(file_path: Path) -> str:
|
|
"""
|
|
Read the APP_ENV_TYPE from the environment file.
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
|
|
Returns:
|
|
str: The environment type (dev, int, prod) or 'dev' as default
|
|
"""
|
|
if not file_path.exists():
|
|
return 'dev'
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith('APP_ENV_TYPE') and '=' in line:
|
|
_, value = line.split('=', 1)
|
|
return value.strip().lower()
|
|
except Exception as e:
|
|
print(f"Warning: Could not read APP_ENV_TYPE from {file_path}: {e}")
|
|
|
|
return 'dev'
|
|
|
|
def is_any_encrypted_value(value: str) -> bool:
|
|
"""
|
|
Check if a value has any encryption prefix (DEV_ENC:, INT_ENC:, PROD_ENC:, etc.).
|
|
|
|
Args:
|
|
value: The value to check
|
|
|
|
Returns:
|
|
bool: True if the value has any encryption prefix, False otherwise
|
|
"""
|
|
if not value or not isinstance(value, str):
|
|
return False
|
|
|
|
# Check for any environment-specific encryption prefixes
|
|
return (value.startswith('DEV_ENC:') or
|
|
value.startswith('INT_ENC:') or
|
|
value.startswith('PROD_ENC:') or
|
|
value.startswith('TEST_ENC:') or
|
|
value.startswith('STAGING_ENC:'))
|
|
|
|
def find_secret_keys_in_file(file_path: Path) -> list:
|
|
"""
|
|
Find all *_SECRET keys in an environment file that are not encrypted.
|
|
This function now reads the APP_ENV_TYPE from the file itself and only
|
|
processes values that are completely unencrypted (no *_ENC: prefix).
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
|
|
Returns:
|
|
list: List of tuples (line_number, key, value, full_line)
|
|
"""
|
|
secret_keys = []
|
|
|
|
if not file_path.exists():
|
|
return secret_keys
|
|
|
|
# Get the environment type from the file itself
|
|
file_env_type = get_env_type_from_file(file_path)
|
|
print(f"📁 Detected environment type from file: {file_env_type.upper()}")
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
i += 1
|
|
continue
|
|
|
|
# Check if line contains a key-value pair
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Check if it's a secret key and not already encrypted with ANY prefix
|
|
if key.endswith('_SECRET') and value and not is_any_encrypted_value(value):
|
|
# Check if value starts with { (JSON object)
|
|
if value.startswith('{'):
|
|
# Collect all lines until we find the closing }
|
|
json_lines = [value]
|
|
start_line = i + 1
|
|
i += 1
|
|
brace_count = value.count('{') - value.count('}')
|
|
|
|
while i < len(lines) and brace_count > 0:
|
|
json_lines.append(lines[i].rstrip('\n'))
|
|
brace_count += lines[i].count('{') - lines[i].count('}')
|
|
i += 1
|
|
|
|
# Join all lines and create the full JSON value
|
|
full_json_value = '\n'.join(json_lines)
|
|
secret_keys.append((start_line, key, full_json_value, line))
|
|
i -= 1 # Adjust for the loop increment
|
|
else:
|
|
# Single line value
|
|
secret_keys.append((i + 1, key, value, line))
|
|
# Check if it's a secret key with multiline JSON (value is just "{")
|
|
elif key.endswith('_SECRET') and value == '{' and not is_any_encrypted_value(value):
|
|
# Collect all lines until we find the closing }
|
|
json_lines = [value]
|
|
start_line = i + 1
|
|
i += 1
|
|
brace_count = 1 # We already have one opening brace
|
|
|
|
while i < len(lines) and brace_count > 0:
|
|
json_lines.append(lines[i].rstrip('\n'))
|
|
brace_count += lines[i].count('{') - lines[i].count('}')
|
|
i += 1
|
|
|
|
# Join all lines and create the full JSON value
|
|
full_json_value = '\n'.join(json_lines)
|
|
secret_keys.append((start_line, key, full_json_value, line))
|
|
i -= 1 # Adjust for the loop increment
|
|
|
|
i += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
|
|
return secret_keys
|
|
|
|
def backup_file(file_path: Path) -> Path:
|
|
"""
|
|
Create a backup of the file before modification.
|
|
|
|
Args:
|
|
file_path: Path to the file to backup
|
|
|
|
Returns:
|
|
Path: Path to the backup file
|
|
"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = file_path.with_suffix(f'.{timestamp}.backup')
|
|
shutil.copy2(file_path, backup_path)
|
|
return backup_path
|
|
|
|
def encrypt_all_secrets_in_file(file_path: Path, env_type: str, dry_run: bool = False, create_backup: bool = True) -> dict:
|
|
"""
|
|
Encrypt all non-encrypted secrets in a file.
|
|
The env_type parameter is now ignored - the function reads APP_ENV_TYPE from the file itself.
|
|
|
|
Args:
|
|
file_path: Path to the environment file
|
|
env_type: The environment type (ignored - read from file instead)
|
|
dry_run: If True, only show what would be changed
|
|
create_backup: If True, create a backup before modifying
|
|
|
|
Returns:
|
|
dict: Results of the encryption process
|
|
"""
|
|
# Get the environment type from the file itself
|
|
file_env_type = get_env_type_from_file(file_path)
|
|
|
|
results = {
|
|
'file': str(file_path),
|
|
'env_type': file_env_type, # Use the environment type from the file
|
|
'secrets_found': 0,
|
|
'secrets_encrypted': 0,
|
|
'errors': [],
|
|
'backup_created': None
|
|
}
|
|
|
|
# Find all secret keys (this function now reads APP_ENV_TYPE from the file)
|
|
secret_keys = find_secret_keys_in_file(file_path)
|
|
results['secrets_found'] = len(secret_keys)
|
|
|
|
if not secret_keys:
|
|
print(f"\n📁 Processing {file_path.name} ({file_env_type.upper()}):")
|
|
print(f" ✅ No unencrypted secrets found - all values already have encryption prefixes")
|
|
return results
|
|
|
|
print(f"\n📁 Processing {file_path.name} ({file_env_type.upper()}):")
|
|
print(f" Found {len(secret_keys)} non-encrypted secrets")
|
|
|
|
if dry_run:
|
|
print(" [DRY RUN] Would encrypt the following secrets:")
|
|
for line_num, key, value, full_line in secret_keys:
|
|
print(f" Line {line_num}: {key} = {value[:50]}{'...' if len(value) > 50 else ''}")
|
|
return results
|
|
|
|
# Create backup if requested
|
|
if create_backup:
|
|
try:
|
|
backup_path = backup_file(file_path)
|
|
results['backup_created'] = str(backup_path)
|
|
print(f" 📋 Backup created: {backup_path.name}")
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to create backup: {e}")
|
|
print(f" ⚠️ Warning: Could not create backup: {e}")
|
|
|
|
# Read the file content
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to read file: {e}")
|
|
return results
|
|
|
|
# Process each secret key
|
|
for line_num, key, value, full_line in secret_keys:
|
|
try:
|
|
print(f" 🔐 Encrypting {key}...")
|
|
|
|
# Encrypt the value using the environment type from the file
|
|
encrypted_value = encryptValue(value, file_env_type)
|
|
|
|
# Replace the line in the file content
|
|
new_line = f"{key} = {encrypted_value}\n"
|
|
lines[line_num - 1] = new_line
|
|
|
|
# If this was a multiline JSON, we need to remove the remaining lines
|
|
if value.startswith('{') and '\n' in value:
|
|
# Count how many lines the original JSON spanned
|
|
json_lines = value.split('\n')
|
|
lines_to_remove = len(json_lines) - 1 # -1 because we already replaced the first line
|
|
|
|
# Remove the remaining lines
|
|
for i in range(line_num, line_num + lines_to_remove):
|
|
if i < len(lines):
|
|
lines[i] = ""
|
|
|
|
results['secrets_encrypted'] += 1
|
|
print(f" ✓ Encrypted successfully")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to encrypt {key}: {e}"
|
|
results['errors'].append(error_msg)
|
|
print(f" ✗ {error_msg}")
|
|
|
|
# Write the modified content back to the file
|
|
if results['secrets_encrypted'] > 0:
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.writelines(lines)
|
|
print(f" 💾 File updated successfully")
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed to write file: {e}")
|
|
print(f" ✗ Failed to write file: {e}")
|
|
|
|
return results
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Encrypt configuration values')
|
|
parser.add_argument('--value', '-v', help='Plain text value to encrypt')
|
|
parser.add_argument('--file', '-f', help='File containing the value to encrypt')
|
|
parser.add_argument('--env', '-e', choices=['dev', 'int', 'prod'],
|
|
help='Environment type (IGNORED - will read APP_ENV_TYPE from file instead)')
|
|
parser.add_argument('--decrypt', '-d', help='Decrypt an encrypted value (for testing)')
|
|
parser.add_argument('--interactive', '-i', action='store_true',
|
|
help='Interactive mode - prompt for value')
|
|
parser.add_argument('--encrypt-all', '-a', help='Encrypt all *_SECRET keys in the specified file')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Show what would be changed without making changes (for --encrypt-all)')
|
|
parser.add_argument('--no-backup', action='store_true',
|
|
help='Skip creating backup files (for --encrypt-all)')
|
|
parser.add_argument('--verify', '-V', help='Verify master key by attempting to decrypt a PROD_ENC: value')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# Handle encrypt-all functionality
|
|
if args.encrypt_all:
|
|
file_path = Path(args.encrypt_all)
|
|
if not file_path.exists():
|
|
print(f"Error: File not found: {file_path}")
|
|
return 1
|
|
|
|
print("🔐 PowerOn Secret Encryption Tool")
|
|
print("=" * 50)
|
|
print("⚠️ IMPORTANT: The tool will read APP_ENV_TYPE from the file itself")
|
|
print("⚠️ Any --env parameter will be IGNORED for safety")
|
|
print()
|
|
|
|
if args.dry_run:
|
|
print("🔍 DRY RUN MODE - No changes will be made")
|
|
print()
|
|
|
|
results = encrypt_all_secrets_in_file(
|
|
file_path,
|
|
args.env or 'ignored', # The env parameter is ignored anyway
|
|
dry_run=args.dry_run,
|
|
create_backup=not args.no_backup
|
|
)
|
|
|
|
# Summary
|
|
print("\n" + "=" * 50)
|
|
print("📊 SUMMARY")
|
|
print("=" * 50)
|
|
print(f"File processed: {file_path.name}")
|
|
print(f"Secrets found: {results['secrets_found']}")
|
|
|
|
if not args.dry_run:
|
|
print(f"Secrets encrypted: {results['secrets_encrypted']}")
|
|
print(f"Errors: {len(results['errors'])}")
|
|
|
|
if len(results['errors']) == 0 and results['secrets_encrypted'] > 0:
|
|
print("\n🎉 All secrets encrypted successfully!")
|
|
elif len(results['errors']) > 0:
|
|
print(f"\n⚠️ Completed with {len(results['errors'])} errors")
|
|
else:
|
|
print("\n✅ No secrets needed encryption")
|
|
else:
|
|
print(f"Secrets that would be encrypted: {results['secrets_found']}")
|
|
|
|
# Show backup information
|
|
if results['backup_created']:
|
|
print(f"\n📋 Backup created: {Path(results['backup_created']).name}")
|
|
|
|
# Show errors if any
|
|
if results['errors']:
|
|
print(f"\n❌ Errors encountered:")
|
|
for error in results['errors']:
|
|
print(f" - {error}")
|
|
|
|
return 0 if len(results['errors']) == 0 else 1
|
|
|
|
# Handle verification
|
|
if args.verify:
|
|
print("Verifying Master Key")
|
|
print("=" * 50)
|
|
|
|
encrypted_value = args.verify.strip()
|
|
|
|
if not isEncryptedValue(encrypted_value):
|
|
print("ERROR: Value does not appear to be encrypted (missing ENV_ENC: prefix)")
|
|
print(" Expected format: PROD_ENC:... or DEV_ENC:... or INT_ENC:...")
|
|
return 1
|
|
|
|
# Determine environment from prefix
|
|
if encrypted_value.startswith('PROD_ENC:'):
|
|
env_type = 'prod'
|
|
elif encrypted_value.startswith('INT_ENC:'):
|
|
env_type = 'int'
|
|
elif encrypted_value.startswith('DEV_ENC:'):
|
|
env_type = 'dev'
|
|
else:
|
|
env_type = 'unknown'
|
|
|
|
print(f"Environment: {env_type.upper()}")
|
|
print(f"Checking master key source...")
|
|
|
|
# Check if master key is available and try to decrypt
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from modules.shared.configuration import _getMasterKey
|
|
|
|
# Try to get the master key to check availability and source
|
|
try:
|
|
master_key = _getMasterKey(env_type)
|
|
|
|
# Determine key source by checking where it came from
|
|
key_location = os.environ.get('APP_KEY_SYSVAR', 'CONFIG_KEY')
|
|
master_key_env = os.environ.get(key_location)
|
|
|
|
if master_key_env:
|
|
print(f" [OK] Found master key in environment variable: {key_location}")
|
|
key_source = f"environment variable '{key_location}'"
|
|
else:
|
|
# Check file at key_location path
|
|
if os.path.exists(key_location):
|
|
print(f" [OK] Found master key in file: {key_location}")
|
|
key_source = f"file '{key_location}'"
|
|
else:
|
|
# Try default key file location
|
|
default_key_file = projectRoot / "local" / "key.txt"
|
|
if default_key_file.exists():
|
|
print(f" [OK] Found master key in default file: {default_key_file}")
|
|
key_source = f"file '{default_key_file}'"
|
|
else:
|
|
print(f" [WARN] Warning: Could not determine key source, but key was found")
|
|
key_source = "unknown (found via _getMasterKey)"
|
|
except ValueError as e:
|
|
print(f" [ERROR] Master key not found: {e}")
|
|
return 1
|
|
|
|
# Now try to decrypt
|
|
print(f"Attempting decryption...")
|
|
decrypted = decryptValue(encrypted_value)
|
|
|
|
print(f"\n[SUCCESS] Master key is CORRECT!")
|
|
print(f" Decrypted value (first 50 chars): {decrypted[:50]}{'...' if len(decrypted) > 50 else ''}")
|
|
print(f" Master key source: {key_source}")
|
|
return 0
|
|
|
|
except ValueError as e:
|
|
error_msg = str(e)
|
|
if "Decryption failed" in error_msg or "InvalidToken" in error_msg or "decrypt" in error_msg.lower():
|
|
print(f"\n[FAILED] Master key is INCORRECT or value is corrupted!")
|
|
print(f" Error: {error_msg}")
|
|
print(f" Master key source: {key_source if 'key_source' in locals() else 'unknown'}")
|
|
return 1
|
|
else:
|
|
print(f"\n[ERROR] {error_msg}")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"\n[ERROR] {str(e)}")
|
|
return 1
|
|
|
|
# Handle decryption
|
|
if args.decrypt:
|
|
if isEncryptedValue(args.decrypt):
|
|
decrypted = decryptValue(args.decrypt)
|
|
print(f"Decrypted value: {decrypted}")
|
|
else:
|
|
print("Error: Value does not appear to be encrypted (missing ENV_ENC: prefix)")
|
|
return
|
|
|
|
# Determine the value to encrypt
|
|
value_to_encrypt = None
|
|
|
|
if args.value:
|
|
value_to_encrypt = args.value
|
|
elif args.file:
|
|
if not os.path.exists(args.file):
|
|
print(f"Error: File not found: {args.file}")
|
|
return
|
|
|
|
with open(args.file, 'r', encoding='utf-8') as f:
|
|
value_to_encrypt = f.read().strip()
|
|
elif args.interactive:
|
|
print("Enter the value to encrypt (press Ctrl+D when done):")
|
|
try:
|
|
value_to_encrypt = sys.stdin.read().strip()
|
|
except EOFError:
|
|
print("Error: No input provided")
|
|
return
|
|
else:
|
|
# Interactive mode by default
|
|
print("Enter the value to encrypt (press Ctrl+D when done):")
|
|
try:
|
|
value_to_encrypt = sys.stdin.read().strip()
|
|
except EOFError:
|
|
print("Error: No input provided")
|
|
return
|
|
|
|
if not value_to_encrypt:
|
|
print("Error: No value provided to encrypt")
|
|
return
|
|
|
|
# Validate JSON if it looks like JSON
|
|
if value_to_encrypt.strip().startswith('{'):
|
|
try:
|
|
json.loads(value_to_encrypt)
|
|
print("✓ Valid JSON detected")
|
|
except json.JSONDecodeError as e:
|
|
print(f"Warning: Value looks like JSON but is invalid: {e}")
|
|
response = input("Continue anyway? (y/N): ")
|
|
if response.lower() != 'y':
|
|
return
|
|
|
|
# Encrypt the value
|
|
encrypted_value = encryptValue(value_to_encrypt, args.env)
|
|
|
|
print(f"\n✓ Encryption successful!")
|
|
print(f"Environment: {args.env or 'current'}")
|
|
print(f"Encrypted value:")
|
|
print(f"{encrypted_value}")
|
|
print(f"\nCopy the above value to your configuration file.")
|
|
|
|
# Show usage example
|
|
print(f"\nUsage in config file:")
|
|
print(f"MY_SECRET_KEY = {encrypted_value}")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|