gateway/tests/test_neutralizer/apprun.py
2025-09-08 12:45:03 +02:00

263 lines
No EOL
9.5 KiB
Python

"""
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
Mehrsprachig: DE, EN, FR, IT
"""
import os
import shutil
import logging
import csv
import json
import pandas as pd
from datetime import datetime
from pathlib import Path
from neutralizer import DataAnonymizer
import traceback
# Define directories
SCRIPT_DIR = Path(__file__).parent
INPUT_DIR = SCRIPT_DIR / 'input'
OUTPUT_DIR = SCRIPT_DIR / 'output'
LOG_DIR = SCRIPT_DIR / 'logs'
LOG_MAPPING = LOG_DIR / 'log_mapping.csv'
LOG_REPLACEMENTS = LOG_DIR / 'log_replacements.csv'
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def setup_directories():
"""Setup output and log directories"""
# Close any existing log handlers
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
# Clear and recreate output directory
output_dir = Path("output")
if output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir()
logger.info(f"Output directory '{output_dir}' created")
# Clear and recreate logs directory
log_dir = Path("logs")
if log_dir.exists():
shutil.rmtree(log_dir)
log_dir.mkdir()
logger.info(f"Log directory '{log_dir}' created")
# Create log files
mapping_log = log_dir / "log_mapping.csv"
replacements_log = log_dir / "log_replacements.csv"
# Create headers for mapping log
with open(mapping_log, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'success', 'file_name', 'replaced_fields', 'content_type', 'headers', 'row_count'])
# Create headers for replacements log
with open(replacements_log, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'success', 'file_name', 'original', 'replacement'])
# Reconfigure logging with new log file
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
file_handler = logging.FileHandler(LOG_DIR / 'app.log', encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
return output_dir, log_dir
def log_mapping(log_dir: Path, file_name: str, success: bool, replaced_fields: list):
"""Log mapping information"""
try:
with open(log_dir / "log_mapping.csv", 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
datetime.now().isoformat(),
file_name,
success,
';'.join(replaced_fields) if replaced_fields else '',
'unknown'
])
except Exception as e:
logger.error(f"Error logging mapping: {str(e)}")
def log_replacements(log_dir: Path, file_name: str, mapping: dict):
"""Log replacement information"""
try:
with open(log_dir / "log_replacements.csv", 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
for original, replacement in mapping.items():
writer.writerow([
datetime.now().isoformat(),
file_name,
original,
replacement
])
except Exception as e:
logger.error(f"Error logging replacements: {str(e)}")
def save_anonymized_data(data: any, output_path: Path, file_type: str):
"""Save anonymized data to file"""
try:
if file_type == '.csv':
if isinstance(data, pd.DataFrame):
data.to_csv(output_path, index=False, encoding='utf-8')
else:
raise ValueError("Data must be a DataFrame for CSV output")
elif file_type == '.json':
if isinstance(data, pd.DataFrame):
data.to_json(output_path, orient='records', force_ascii=False)
else:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
elif file_type == '.xml':
if isinstance(data, str):
with open(output_path, 'w', encoding='utf-8') as f:
f.write(data)
else:
raise ValueError("Data must be a string for XML output")
elif file_type in ['.txt', '.docx']:
if isinstance(data, str):
with open(output_path, 'w', encoding='utf-8') as f:
f.write(data)
else:
raise ValueError("Data must be a string for text output")
else:
raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e:
logger.error(f"Error saving anonymized data to {output_path}: {str(e)}")
raise
def read_file_content(file_path: Path) -> tuple[str, str]:
"""Read file content and determine content type"""
try:
file_type = file_path.suffix.lower()
if file_type == '.docx':
import docx
doc = docx.Document(file_path)
content = '\n'.join(para.text for para in doc.paragraphs)
else:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, file_type
except Exception as e:
logger.error(f"Error reading file {file_path}: {str(e)}")
raise
def process_file(file_path: Path, anonymizer: DataAnonymizer) -> bool:
"""Process a single file and save anonymized version"""
try:
# Read file content
content = file_path.read_text(encoding='utf-8')
# Process content
result = anonymizer.process_content(content, file_path.suffix[1:])
if result.data is None:
logger.error(f"Failed to process {file_path.name}")
return False
# Save anonymized content with neutralized_ prefix
output_path = OUTPUT_DIR / f"neutralized_{file_path.name}"
if file_path.suffix.lower() == '.csv':
result.data.to_csv(output_path, index=False)
elif file_path.suffix.lower() == '.json':
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result.data, f, indent=2)
elif file_path.suffix.lower() == '.xml':
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result.data)
else:
# For text files, preserve original whitespace
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result.data)
# Log processing details
timestamp = datetime.now().isoformat()
success = True
# Create detailed log entry
log_entry = {
'timestamp': timestamp,
'success': success,
'file_name': file_path.name,
'replaced_fields': ';'.join(set(result.replaced_fields)), # Use set to remove duplicates
'content_type': result.processed_info.get('type', 'unknown')
}
# Add type-specific details
if result.processed_info['type'] == 'table':
log_entry.update({
'headers': ';'.join(result.processed_info['headers']),
'row_count': result.processed_info['row_count']
})
elif result.processed_info['type'] == 'text':
tables = result.processed_info.get('tables', [])
text_sections = result.processed_info.get('text_sections', [])
log_entry.update({
'headers': '', # Empty for text files
'row_count': sum(s['length'] for s in text_sections) # Total text length
})
# Write to log file
with open(LOG_MAPPING, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=log_entry.keys())
if f.tell() == 0: # Write header if file is empty
writer.writeheader()
writer.writerow(log_entry)
# Log replacements
for original, replacement in result.mapping.items():
with open(LOG_REPLACEMENTS, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([timestamp, success, file_path.name, original, replacement])
return True
except Exception as e:
logger.error(f"Error processing {file_path.name}: {str(e)}")
logger.debug(traceback.format_exc())
return False
def main():
# Setup directories
setup_directories()
# Initialize anonymizer
anonymizer = DataAnonymizer()
# Process all files
logger.info("Starting file processing...")
testdata_dir = Path("testdata")
for file_path in testdata_dir.glob("*.*"):
try:
logger.info(f"Processing file: {file_path.name}")
# Process file
if process_file(file_path, anonymizer):
logger.info(f"Anonymization completed for {file_path.name}")
else:
logger.error(f"Error processing {file_path.name}")
except Exception as e:
logger.error(f"Error processing {file_path.name}: {str(e)}")
continue
logger.info("Processing completed!")
if __name__ == "__main__":
main()