""" List processing module for data anonymization Handles structured data with headers (CSV, JSON, XML) """ import json import pandas as pd import xml.etree.ElementTree as ET from typing import Dict, List, Any, Union from dataclasses import dataclass from io import StringIO from modules.services.serviceNeutralization.subParseString import StringParser from modules.services.serviceNeutralization.subPatterns import get_pattern_for_header, HeaderPatterns @dataclass class TableData: """Repräsentiert Tabellendaten""" headers: List[str] rows: List[List[str]] source_type: str # 'csv', 'json', 'xml', 'text_table' class ListProcessor: """Handles structured data processing with headers for anonymization""" def __init__(self, names_to_parse: List[str] = None): """ Initialize the list processor Args: names_to_parse: List of names to parse and replace """ self.string_parser = StringParser(names_to_parse) self.header_patterns = HeaderPatterns.patterns def anonymize_table(self, table: TableData) -> TableData: """ Anonymize table data based on headers Args: table: TableData object to anonymize Returns: TableData: Anonymized table """ anonymized_table = TableData( headers=table.headers.copy(), rows=[row.copy() for row in table.rows], source_type=table.source_type ) for i, header in enumerate(anonymized_table.headers): pattern = get_pattern_for_header(header, self.header_patterns) if pattern: for row in anonymized_table.rows: if row[i] is not None: original = str(row[i]) if original not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) self.string_parser.mapping[original] = pattern.replacement_template.format(len(self.string_parser.mapping) + 1) row[i] = self.string_parser.mapping[original] return anonymized_table def process_csv_content(self, content: str) -> tuple: """ Process CSV content Args: content: CSV content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ df = pd.read_csv(StringIO(content), encoding='utf-8') table = TableData( headers=df.columns.tolist(), rows=df.values.tolist(), source_type='csv' ) if not table.rows: return None, self.string_parser.get_mapping(), [], {'type': 'table', 'headers': table.headers, 'row_count': 0} anonymized_table = self.anonymize_table(table) # Track replaced fields replaced_fields = [] for i, header in enumerate(anonymized_table.headers): for orig_row, anon_row in zip(table.rows, anonymized_table.rows): if anon_row[i] != orig_row[i]: replaced_fields.append(header) # Convert back to DataFrame result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers) processed_info = { 'type': 'table', 'headers': table.headers, 'row_count': len(table.rows) } return result, self.string_parser.get_mapping(), replaced_fields, processed_info def process_json_content(self, content: str) -> tuple: """ Process JSON content Args: content: JSON content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ data = json.loads(content) # Process JSON recursively using string parser result = self.string_parser.process_json_value(data) processed_info = {'type': 'json'} return result, self.string_parser.get_mapping(), [], processed_info def anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str: """ Recursively process XML element and return formatted string Args: element: XML element to process indent: Current indentation level Returns: Formatted XML string """ # Process attributes processed_attrs = {} for attr_name, attr_value in element.attrib.items(): # Check if attribute name matches any header patterns pattern = get_pattern_for_header(attr_name, self.header_patterns) if pattern: if attr_value not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) # Create placeholder in format [type.uuid] type_mapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholder_type = type_mapping.get(pattern.name, 'data') self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]" processed_attrs[attr_name] = self.string_parser.mapping[attr_value] else: # Check if attribute value matches any data patterns from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns matches = find_patterns_in_text(attr_value, DataPatterns.patterns) if matches: pattern_name = matches[0][0] pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None) if pattern: if attr_value not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) # Create placeholder in format [type.uuid] type_mapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholder_type = type_mapping.get(pattern_name, 'data') self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]" processed_attrs[attr_name] = self.string_parser.mapping[attr_value] else: processed_attrs[attr_name] = attr_value else: processed_attrs[attr_name] = attr_value attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items()) attrs = f' {attrs}' if attrs else '' # Process text content text = element.text.strip() if element.text and element.text.strip() else '' if text: # Skip if already a placeholder if not self.string_parser.is_placeholder(text): # Check if text matches any patterns from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns pattern_matches = find_patterns_in_text(text, DataPatterns.patterns) if pattern_matches: pattern_name = pattern_matches[0][0] pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None) if pattern: if text not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) # Create placeholder in format [type.uuid] type_mapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholder_type = type_mapping.get(pattern_name, 'data') self.string_parser.mapping[text] = f"[{placeholder_type}.{placeholder_id}]" text = self.string_parser.mapping[text] else: # Check if text matches any custom names from the user list for name in self.string_parser.names_to_parse: if not name.strip(): continue if text.lower().strip() == name.lower().strip(): if text not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) self.string_parser.mapping[text] = f"[name.{placeholder_id}]" text = self.string_parser.mapping[text] break # Process child elements children = [] for child in element: child_str = self.anonymize_xml_element(child, indent + ' ') children.append(child_str) # Build element string if not children and not text: return f"{indent}<{element.tag}{attrs}/>" elif not children: return f"{indent}<{element.tag}{attrs}>{text}" else: result = [f"{indent}<{element.tag}{attrs}>"] if text: result.append(f"{indent} {text}") result.extend(children) result.append(f"{indent}") return '\n'.join(result) def process_xml_content(self, content: str) -> tuple: """ Process XML content Args: content: XML content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ root = ET.fromstring(content) # Process XML recursively with proper formatting result = self.anonymize_xml_element(root) processed_info = {'type': 'xml'} return result, self.string_parser.get_mapping(), [], processed_info def get_mapping(self) -> Dict[str, str]: """ Get the current mapping of original values to placeholders Returns: Dict[str, str]: Mapping dictionary """ return self.string_parser.get_mapping() def clear_mapping(self): """Clear the current mapping""" self.string_parser.clear_mapping()