263 lines
No EOL
9.5 KiB
Python
263 lines
No EOL
9.5 KiB
Python
"""
|
|
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
|
|
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
|
|
Mehrsprachig: DE, EN, FR, IT
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import logging
|
|
import csv
|
|
import json
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from neutralizer import DataAnonymizer
|
|
import traceback
|
|
|
|
# Define directories
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
INPUT_DIR = SCRIPT_DIR / 'input'
|
|
OUTPUT_DIR = SCRIPT_DIR / 'output'
|
|
LOG_DIR = SCRIPT_DIR / 'logs'
|
|
LOG_MAPPING = LOG_DIR / 'log_mapping.csv'
|
|
LOG_REPLACEMENTS = LOG_DIR / 'log_replacements.csv'
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def setup_directories():
|
|
"""Setup output and log directories"""
|
|
# Close any existing log handlers
|
|
for handler in logger.handlers[:]:
|
|
handler.close()
|
|
logger.removeHandler(handler)
|
|
|
|
# Clear and recreate output directory
|
|
output_dir = Path("output")
|
|
if output_dir.exists():
|
|
shutil.rmtree(output_dir)
|
|
output_dir.mkdir()
|
|
logger.info(f"Output directory '{output_dir}' created")
|
|
|
|
# Clear and recreate logs directory
|
|
log_dir = Path("logs")
|
|
if log_dir.exists():
|
|
shutil.rmtree(log_dir)
|
|
log_dir.mkdir()
|
|
logger.info(f"Log directory '{log_dir}' created")
|
|
|
|
# Create log files
|
|
mapping_log = log_dir / "log_mapping.csv"
|
|
replacements_log = log_dir / "log_replacements.csv"
|
|
|
|
# Create headers for mapping log
|
|
with open(mapping_log, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['timestamp', 'success', 'file_name', 'replaced_fields', 'content_type', 'headers', 'row_count'])
|
|
|
|
# Create headers for replacements log
|
|
with open(replacements_log, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['timestamp', 'success', 'file_name', 'original', 'replacement'])
|
|
|
|
# Reconfigure logging with new log file
|
|
for handler in logger.handlers[:]:
|
|
handler.close()
|
|
logger.removeHandler(handler)
|
|
|
|
file_handler = logging.FileHandler(LOG_DIR / 'app.log', encoding='utf-8')
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
logger.addHandler(file_handler)
|
|
|
|
return output_dir, log_dir
|
|
|
|
def log_mapping(log_dir: Path, file_name: str, success: bool, replaced_fields: list):
|
|
"""Log mapping information"""
|
|
try:
|
|
with open(log_dir / "log_mapping.csv", 'a', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
datetime.now().isoformat(),
|
|
file_name,
|
|
success,
|
|
';'.join(replaced_fields) if replaced_fields else '',
|
|
'unknown'
|
|
])
|
|
except Exception as e:
|
|
logger.error(f"Error logging mapping: {str(e)}")
|
|
|
|
def log_replacements(log_dir: Path, file_name: str, mapping: dict):
|
|
"""Log replacement information"""
|
|
try:
|
|
with open(log_dir / "log_replacements.csv", 'a', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
for original, replacement in mapping.items():
|
|
writer.writerow([
|
|
datetime.now().isoformat(),
|
|
file_name,
|
|
original,
|
|
replacement
|
|
])
|
|
except Exception as e:
|
|
logger.error(f"Error logging replacements: {str(e)}")
|
|
|
|
def save_anonymized_data(data: any, output_path: Path, file_type: str):
|
|
"""Save anonymized data to file"""
|
|
try:
|
|
if file_type == '.csv':
|
|
if isinstance(data, pd.DataFrame):
|
|
data.to_csv(output_path, index=False, encoding='utf-8')
|
|
else:
|
|
raise ValueError("Data must be a DataFrame for CSV output")
|
|
elif file_type == '.json':
|
|
if isinstance(data, pd.DataFrame):
|
|
data.to_json(output_path, orient='records', force_ascii=False)
|
|
else:
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
elif file_type == '.xml':
|
|
if isinstance(data, str):
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(data)
|
|
else:
|
|
raise ValueError("Data must be a string for XML output")
|
|
elif file_type in ['.txt', '.docx']:
|
|
if isinstance(data, str):
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(data)
|
|
else:
|
|
raise ValueError("Data must be a string for text output")
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving anonymized data to {output_path}: {str(e)}")
|
|
raise
|
|
|
|
def read_file_content(file_path: Path) -> tuple[str, str]:
|
|
"""Read file content and determine content type"""
|
|
try:
|
|
file_type = file_path.suffix.lower()
|
|
if file_type == '.docx':
|
|
import docx
|
|
doc = docx.Document(file_path)
|
|
content = '\n'.join(para.text for para in doc.paragraphs)
|
|
else:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
return content, file_type
|
|
except Exception as e:
|
|
logger.error(f"Error reading file {file_path}: {str(e)}")
|
|
raise
|
|
|
|
def process_file(file_path: Path, anonymizer: DataAnonymizer) -> bool:
|
|
"""Process a single file and save anonymized version"""
|
|
try:
|
|
# Read file content
|
|
content = file_path.read_text(encoding='utf-8')
|
|
|
|
# Process content
|
|
result = anonymizer.process_content(content, file_path.suffix[1:])
|
|
|
|
if result.data is None:
|
|
logger.error(f"Failed to process {file_path.name}")
|
|
return False
|
|
|
|
# Save anonymized content with neutralized_ prefix
|
|
output_path = OUTPUT_DIR / f"neutralized_{file_path.name}"
|
|
if file_path.suffix.lower() == '.csv':
|
|
result.data.to_csv(output_path, index=False)
|
|
elif file_path.suffix.lower() == '.json':
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(result.data, f, indent=2)
|
|
elif file_path.suffix.lower() == '.xml':
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(result.data)
|
|
else:
|
|
# For text files, preserve original whitespace
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(result.data)
|
|
|
|
# Log processing details
|
|
timestamp = datetime.now().isoformat()
|
|
success = True
|
|
|
|
# Create detailed log entry
|
|
log_entry = {
|
|
'timestamp': timestamp,
|
|
'success': success,
|
|
'file_name': file_path.name,
|
|
'replaced_fields': ';'.join(set(result.replaced_fields)), # Use set to remove duplicates
|
|
'content_type': result.processed_info.get('type', 'unknown')
|
|
}
|
|
|
|
# Add type-specific details
|
|
if result.processed_info['type'] == 'table':
|
|
log_entry.update({
|
|
'headers': ';'.join(result.processed_info['headers']),
|
|
'row_count': result.processed_info['row_count']
|
|
})
|
|
elif result.processed_info['type'] == 'text':
|
|
tables = result.processed_info.get('tables', [])
|
|
text_sections = result.processed_info.get('text_sections', [])
|
|
log_entry.update({
|
|
'headers': '', # Empty for text files
|
|
'row_count': sum(s['length'] for s in text_sections) # Total text length
|
|
})
|
|
|
|
# Write to log file
|
|
with open(LOG_MAPPING, 'a', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=log_entry.keys())
|
|
if f.tell() == 0: # Write header if file is empty
|
|
writer.writeheader()
|
|
writer.writerow(log_entry)
|
|
|
|
# Log replacements
|
|
for original, replacement in result.mapping.items():
|
|
with open(LOG_REPLACEMENTS, 'a', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([timestamp, success, file_path.name, original, replacement])
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path.name}: {str(e)}")
|
|
logger.debug(traceback.format_exc())
|
|
return False
|
|
|
|
def main():
|
|
# Setup directories
|
|
setup_directories()
|
|
|
|
# Initialize anonymizer
|
|
anonymizer = DataAnonymizer()
|
|
|
|
# Process all files
|
|
logger.info("Starting file processing...")
|
|
testdata_dir = Path("testdata")
|
|
|
|
for file_path in testdata_dir.glob("*.*"):
|
|
try:
|
|
logger.info(f"Processing file: {file_path.name}")
|
|
|
|
# Process file
|
|
if process_file(file_path, anonymizer):
|
|
logger.info(f"Anonymization completed for {file_path.name}")
|
|
else:
|
|
logger.error(f"Error processing {file_path.name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path.name}: {str(e)}")
|
|
continue
|
|
|
|
logger.info("Processing completed!")
|
|
|
|
if __name__ == "__main__":
|
|
main() |