""" DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme Unterstützt TXT, JSON, CSV, Excel und Word-Dateien Mehrsprachig: DE, EN, FR, IT """ import re import json import pandas as pd from typing import Dict, List, Tuple, Any, Union, Optional from dataclasses import dataclass import logging import traceback import xml.etree.ElementTree as ET from io import StringIO from patterns import Pattern, HeaderPatterns, DataPatterns, get_pattern_for_header, find_patterns_in_text, TextTablePatterns # Configure logging logger = logging.getLogger(__name__) @dataclass class TableData: """Repräsentiert Tabellendaten""" headers: List[str] rows: List[List[str]] source_type: str # 'csv', 'json', 'xml', 'text_table' @dataclass class PlainText: """Repräsentiert normalen Text""" content: str source_type: str # 'txt', 'docx', 'text_plain' @dataclass class ProcessResult: """Result of content processing""" data: Any mapping: Dict[str, str] replaced_fields: List[str] processed_info: Dict[str, Any] # Additional processing information class DataAnonymizer: """Hauptklasse für die Datenanonymisierung""" def __init__(self): """Initialize the anonymizer with patterns""" self.header_patterns = HeaderPatterns.patterns self.data_patterns = DataPatterns.patterns self.replaced_fields = set() self.mapping = {} self.processing_info = [] def _normalize_whitespace(self, text: str) -> str: """Normalize whitespace in text""" text = re.sub(r'\s+', ' ', text) text = text.replace('\r\n', '\n').replace('\r', '\n') return text.strip() def _is_table_line(self, line: str) -> bool: """Check if a line represents a table row""" return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or re.match(r'^\s*[^\t]+\t[^\t]+$', line)) def _extract_tables_from_text(self, content: str) -> Tuple[List[TableData], List[PlainText]]: """ Extract tables and plain text from content Args: content: Content to process Returns: Tuple of (list of tables, list of plain text sections) """ tables = [] plain_texts = [] # Process the entire content as plain text plain_texts.append(PlainText(content=content, source_type='text_plain')) return tables, plain_texts def _anonymize_table(self, table: TableData) -> TableData: """Anonymize table data""" try: anonymized_table = TableData( headers=table.headers.copy(), rows=[row.copy() for row in table.rows], source_type=table.source_type ) for i, header in enumerate(anonymized_table.headers): pattern = get_pattern_for_header(header, self.header_patterns) if pattern: for row in anonymized_table.rows: if row[i] is not None: original = str(row[i]) if original not in self.mapping: self.mapping[original] = pattern.replacement_template.format(len(self.mapping) + 1) row[i] = self.mapping[original] return anonymized_table except Exception as e: logger.error(f"Error anonymizing table: {str(e)}") logger.debug(traceback.format_exc()) raise def _anonymize_plain_text(self, text: PlainText) -> PlainText: """Anonymize plain text content""" try: # Process the entire text at once instead of line by line current_text = text.content # Find all matches in the entire text matches = find_patterns_in_text(current_text, self.data_patterns) # Process matches in reverse order to avoid position shifting for match in sorted(matches, key=lambda x: x[2], reverse=True): pattern_name, matched_text, start, end = match # Skip if the matched text is already a placeholder if re.match(r'\[[A-Z_]+\d+\]', matched_text): continue # Find the pattern that matched pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) if pattern: # Use the pattern's replacement template if matched_text not in self.mapping: self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1) replacement = self.mapping[matched_text] if pattern_name == 'email': print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'") print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}") # Replace the matched text while preserving surrounding whitespace current_text = current_text[:start] + replacement + current_text[end:] return PlainText(content=current_text, source_type=text.source_type) except Exception as e: logger.error(f"Error anonymizing plain text: {str(e)}") logger.debug(traceback.format_exc()) raise def _anonymize_json_value(self, value: Any, key: str = None) -> Any: """ Recursively anonymize JSON values based on their keys and content Args: value: Value to anonymize key: Key name (if part of a key-value pair) Returns: Anonymized value """ if isinstance(value, dict): return {k: self._anonymize_json_value(v, k) for k, v in value.items()} elif isinstance(value, list): return [self._anonymize_json_value(item) for item in value] elif isinstance(value, str): # Check if this is a key we should process if key: pattern = get_pattern_for_header(key, self.header_patterns) if pattern: if value not in self.mapping: self.mapping[value] = pattern.replacement_template.format(len(self.mapping) + 1) return self.mapping[value] # Check if the value itself matches any patterns matches = find_patterns_in_text(value, self.data_patterns) if matches: # Use the first match's pattern pattern_name = matches[0][0] if value not in self.mapping: self.mapping[value] = f"{pattern_name.upper()}_{len(self.mapping) + 1}" return self.mapping[value] return value else: return value def _anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str: """ Recursively process XML element and return formatted string Args: element: XML element to process indent: Current indentation level Returns: Formatted XML string """ # Process attributes processed_attrs = {} for attr_name, attr_value in element.attrib.items(): # Check if attribute name matches any header patterns pattern = get_pattern_for_header(attr_name, self.header_patterns) if pattern: if attr_value not in self.mapping: self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1) processed_attrs[attr_name] = self.mapping[attr_value] else: # Check if attribute value matches any data patterns matches = find_patterns_in_text(attr_value, self.data_patterns) if matches: pattern_name = matches[0][0] pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) if pattern: if attr_value not in self.mapping: self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1) processed_attrs[attr_name] = self.mapping[attr_value] else: processed_attrs[attr_name] = attr_value else: processed_attrs[attr_name] = attr_value attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items()) attrs = f' {attrs}' if attrs else '' # Process text content text = element.text.strip() if element.text and element.text.strip() else '' if text: # Check if text matches any patterns matches = find_patterns_in_text(text, self.data_patterns) if matches: pattern_name = matches[0][0] pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) if pattern: if text not in self.mapping: self.mapping[text] = pattern.replacement_template.format(len(self.mapping) + 1) text = self.mapping[text] # Process child elements children = [] for child in element: child_str = self._anonymize_xml_element(child, indent + ' ') children.append(child_str) # Build element string if not children and not text: return f"{indent}<{element.tag}{attrs}/>" elif not children: return f"{indent}<{element.tag}{attrs}>{text}" else: result = [f"{indent}<{element.tag}{attrs}>"] if text: result.append(f"{indent} {text}") result.extend(children) result.append(f"{indent}") return '\n'.join(result) def process_content(self, content: str, content_type: str) -> ProcessResult: """ Process content and return anonymized data Args: content: Content to process content_type: Type of content ('csv', 'json', 'xml', 'text') Returns: ProcessResult: Contains anonymized data, mapping, replaced fields and processing info """ try: replaced_fields = [] processed_info = {} if content_type in ['csv', 'json', 'xml']: # Handle as table if content_type == 'csv': df = pd.read_csv(StringIO(content), encoding='utf-8') table = TableData( headers=df.columns.tolist(), rows=df.values.tolist(), source_type='csv' ) processed_info['type'] = 'table' processed_info['headers'] = table.headers processed_info['row_count'] = len(table.rows) elif content_type == 'json': data = json.loads(content) # Process JSON recursively result = self._anonymize_json_value(data) processed_info['type'] = 'json' return ProcessResult(result, self.mapping, replaced_fields, processed_info) else: # xml root = ET.fromstring(content) # Process XML recursively with proper formatting result = self._anonymize_xml_element(root) processed_info['type'] = 'xml' return ProcessResult(result, self.mapping, replaced_fields, processed_info) if not table.rows: return ProcessResult(None, self.mapping, [], processed_info) anonymized_table = self._anonymize_table(table) # Track replaced fields for i, header in enumerate(anonymized_table.headers): for orig_row, anon_row in zip(table.rows, anonymized_table.rows): if anon_row[i] != orig_row[i]: replaced_fields.append(header) # Convert back to original format if content_type == 'csv': result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers) elif content_type == 'json': if len(anonymized_table.headers) == 1 and anonymized_table.headers[0] == 'value': result = anonymized_table.rows[0][0] else: result = dict(zip(anonymized_table.headers, anonymized_table.rows[0])) else: # xml result = ET.tostring(root, encoding='unicode') return ProcessResult(result, self.mapping, replaced_fields, processed_info) else: # Handle as text # First, identify what needs to be replaced using table detection tables, plain_texts = self._extract_tables_from_text(content) processed_info['type'] = 'text' processed_info['tables'] = [{'headers': t.headers, 'row_count': len(t.rows)} for t in tables] # Process plain text sections anonymized_texts = [self._anonymize_plain_text(text) for text in plain_texts] # Combine all processed content result = content for text, anonymized_text in zip(plain_texts, anonymized_texts): if text.content != anonymized_text.content: result = result.replace(text.content, anonymized_text.content) return ProcessResult(result, self.mapping, replaced_fields, processed_info) except Exception as e: logger.error(f"Error processing content: {str(e)}") logger.debug(traceback.format_exc()) return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)})