""" DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme Unterstützt TXT, JSON, CSV, Excel und Word-Dateien Mehrsprachig: DE, EN, FR, IT """ import os import shutil import logging import csv import json import pandas as pd from datetime import datetime from pathlib import Path from neutralizer import DataAnonymizer import traceback # Define directories SCRIPT_DIR = Path(__file__).parent INPUT_DIR = SCRIPT_DIR / 'input' OUTPUT_DIR = SCRIPT_DIR / 'output' LOG_DIR = SCRIPT_DIR / 'logs' LOG_MAPPING = LOG_DIR / 'log_mapping.csv' LOG_REPLACEMENTS = LOG_DIR / 'log_replacements.csv' # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def setup_directories(): """Setup output and log directories""" # Close any existing log handlers for handler in logger.handlers[:]: handler.close() logger.removeHandler(handler) # Clear and recreate output directory output_dir = Path("output") if output_dir.exists(): shutil.rmtree(output_dir) output_dir.mkdir() logger.info(f"Output directory '{output_dir}' created") # Clear and recreate logs directory log_dir = Path("logs") if log_dir.exists(): shutil.rmtree(log_dir) log_dir.mkdir() logger.info(f"Log directory '{log_dir}' created") # Create log files mapping_log = log_dir / "log_mapping.csv" replacements_log = log_dir / "log_replacements.csv" # Create headers for mapping log with open(mapping_log, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['timestamp', 'success', 'file_name', 'replaced_fields', 'content_type', 'headers', 'row_count']) # Create headers for replacements log with open(replacements_log, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['timestamp', 'success', 'file_name', 'original', 'replacement']) # Reconfigure logging with new log file for handler in logger.handlers[:]: handler.close() logger.removeHandler(handler) file_handler = logging.FileHandler(LOG_DIR / 'app.log', encoding='utf-8') file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) return output_dir, log_dir def log_mapping(log_dir: Path, file_name: str, success: bool, replaced_fields: list): """Log mapping information""" try: with open(log_dir / "log_mapping.csv", 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ datetime.now().isoformat(), file_name, success, ';'.join(replaced_fields) if replaced_fields else '', 'unknown' ]) except Exception as e: logger.error(f"Error logging mapping: {str(e)}") def log_replacements(log_dir: Path, file_name: str, mapping: dict): """Log replacement information""" try: with open(log_dir / "log_replacements.csv", 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) for original, replacement in mapping.items(): writer.writerow([ datetime.now().isoformat(), file_name, original, replacement ]) except Exception as e: logger.error(f"Error logging replacements: {str(e)}") def save_anonymized_data(data: any, output_path: Path, file_type: str): """Save anonymized data to file""" try: if file_type == '.csv': if isinstance(data, pd.DataFrame): data.to_csv(output_path, index=False, encoding='utf-8') else: raise ValueError("Data must be a DataFrame for CSV output") elif file_type == '.json': if isinstance(data, pd.DataFrame): data.to_json(output_path, orient='records', force_ascii=False) else: with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) elif file_type == '.xml': if isinstance(data, str): with open(output_path, 'w', encoding='utf-8') as f: f.write(data) else: raise ValueError("Data must be a string for XML output") elif file_type in ['.txt', '.docx']: if isinstance(data, str): with open(output_path, 'w', encoding='utf-8') as f: f.write(data) else: raise ValueError("Data must be a string for text output") else: raise ValueError(f"Unsupported file type: {file_type}") except Exception as e: logger.error(f"Error saving anonymized data to {output_path}: {str(e)}") raise def read_file_content(file_path: Path) -> tuple[str, str]: """Read file content and determine content type""" try: file_type = file_path.suffix.lower() if file_type == '.docx': import docx doc = docx.Document(file_path) content = '\n'.join(para.text for para in doc.paragraphs) else: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return content, file_type except Exception as e: logger.error(f"Error reading file {file_path}: {str(e)}") raise def process_file(file_path: Path, anonymizer: DataAnonymizer) -> bool: """Process a single file and save anonymized version""" try: # Read file content content = file_path.read_text(encoding='utf-8') # Process content result = anonymizer.process_content(content, file_path.suffix[1:]) if result.data is None: logger.error(f"Failed to process {file_path.name}") return False # Save anonymized content with neutralized_ prefix output_path = OUTPUT_DIR / f"neutralized_{file_path.name}" if file_path.suffix.lower() == '.csv': result.data.to_csv(output_path, index=False) elif file_path.suffix.lower() == '.json': with open(output_path, 'w', encoding='utf-8') as f: json.dump(result.data, f, indent=2) elif file_path.suffix.lower() == '.xml': with open(output_path, 'w', encoding='utf-8') as f: f.write(result.data) else: # For text files, preserve original whitespace with open(output_path, 'w', encoding='utf-8') as f: f.write(result.data) # Log processing details timestamp = datetime.now().isoformat() success = True # Create detailed log entry log_entry = { 'timestamp': timestamp, 'success': success, 'file_name': file_path.name, 'replaced_fields': ';'.join(set(result.replaced_fields)), # Use set to remove duplicates 'content_type': result.processed_info.get('type', 'unknown') } # Add type-specific details if result.processed_info['type'] == 'table': log_entry.update({ 'headers': ';'.join(result.processed_info['headers']), 'row_count': result.processed_info['row_count'] }) elif result.processed_info['type'] == 'text': tables = result.processed_info.get('tables', []) text_sections = result.processed_info.get('text_sections', []) log_entry.update({ 'headers': '', # Empty for text files 'row_count': sum(s['length'] for s in text_sections) # Total text length }) # Write to log file with open(LOG_MAPPING, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=log_entry.keys()) if f.tell() == 0: # Write header if file is empty writer.writeheader() writer.writerow(log_entry) # Log replacements for original, replacement in result.mapping.items(): with open(LOG_REPLACEMENTS, 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([timestamp, success, file_path.name, original, replacement]) return True except Exception as e: logger.error(f"Error processing {file_path.name}: {str(e)}") logger.debug(traceback.format_exc()) return False def main(): # Setup directories setup_directories() # Initialize anonymizer anonymizer = DataAnonymizer() # Process all files logger.info("Starting file processing...") testdata_dir = Path("testdata") for file_path in testdata_dir.glob("*.*"): try: logger.info(f"Processing file: {file_path.name}") # Process file if process_file(file_path, anonymizer): logger.info(f"Anonymization completed for {file_path.name}") else: logger.error(f"Error processing {file_path.name}") except Exception as e: logger.error(f"Error processing {file_path.name}: {str(e)}") continue logger.info("Processing completed!") if __name__ == "__main__": main()