# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ List processing module for data anonymization Handles structured data with headers (CSV, JSON, XML) """ import json import pandas as pd import xml.etree.ElementTree as ET from typing import Dict, List, Any, Union from dataclasses import dataclass from io import StringIO from .subParseString import StringParser from .subPatterns import getPatternForHeader, HeaderPatterns @dataclass class TableData: """Repräsentiert Tabellendaten""" headers: List[str] rows: List[List[str]] source_type: str # 'csv', 'json', 'xml', 'text_table' class ListProcessor: """Handles structured data processing with headers for anonymization""" def __init__(self, NamesToParse: List[str] = None): """ Initialize the list processor Args: NamesToParse: List of names to parse and replace """ self.string_parser = StringParser(NamesToParse) self.header_patterns = HeaderPatterns.patterns def _anonymizeTable(self, table: TableData) -> TableData: """ Anonymize table data based on headers Args: table: TableData object to anonymize Returns: TableData: Anonymized table """ anonymizedTable = TableData( headers=table.headers.copy(), rows=[row.copy() for row in table.rows], source_type=table.source_type ) for i, header in enumerate(anonymizedTable.headers): pattern = getPatternForHeader(header, self.header_patterns) if pattern: for row in anonymizedTable.rows: if row[i] is not None: original = str(row[i]) if original not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholderId = str(uuid.uuid4()) self.string_parser.mapping[original] = pattern.replacement_template.format(len(self.string_parser.mapping) + 1) row[i] = self.string_parser.mapping[original] return anonymizedTable def processCsvContent(self, content: str) -> tuple: """ Process CSV content Args: content: CSV content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ df = pd.read_csv(StringIO(content), encoding='utf-8') table = TableData( headers=df.columns.tolist(), rows=df.values.tolist(), source_type='csv' ) if not table.rows: return None, self.string_parser.getMapping(), [], {'type': 'table', 'headers': table.headers, 'row_count': 0} anonymizedTable = self._anonymizeTable(table) # Track replaced fields replacedFields = [] for i, header in enumerate(anonymizedTable.headers): for origRow, anonRow in zip(table.rows, anonymizedTable.rows): if anonRow[i] != origRow[i]: replacedFields.append(header) # Convert back to DataFrame result = pd.DataFrame(anonymizedTable.rows, columns=anonymizedTable.headers) processedInfo = { 'type': 'table', 'headers': table.headers, 'row_count': len(table.rows) } return result, self.string_parser.getMapping(), replacedFields, processedInfo def processJsonContent(self, content: str) -> tuple: """ Process JSON content Args: content: JSON content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ data = json.loads(content) # Process JSON recursively using string parser result = self.string_parser.processJsonValue(data) processedInfo = {'type': 'json'} return result, self.string_parser.getMapping(), [], processedInfo def _anonymizeXmlElement(self, element: ET.Element, indent: str = '') -> str: """ Recursively process XML element and return formatted string Args: element: XML element to process indent: Current indentation level Returns: Formatted XML string """ # Process attributes processedAttrs = {} for attrName, attrValue in element.attrib.items(): # Check if attribute name matches any header patterns pattern = getPatternForHeader(attrName, self.header_patterns) if pattern: if attrValue not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholderId = str(uuid.uuid4()) # Create placeholder in format [type.uuid] typeMapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholderType = typeMapping.get(pattern.name, 'data') self.string_parser.mapping[attrValue] = f"[{placeholderType}.{placeholderId}]" processedAttrs[attrName] = self.string_parser.mapping[attrValue] else: # Check if attribute value matches any data patterns from .subPatterns import findPatternsInText, DataPatterns matches = findPatternsInText(attrValue, DataPatterns.patterns) if matches: patternName = matches[0][0] pattern = next((p for p in DataPatterns.patterns if p.name == patternName), None) if pattern: if attrValue not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholderId = str(uuid.uuid4()) # Create placeholder in format [type.uuid] typeMapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholderType = typeMapping.get(patternName, 'data') self.string_parser.mapping[attrValue] = f"[{placeholderType}.{placeholderId}]" processedAttrs[attrName] = self.string_parser.mapping[attrValue] else: processedAttrs[attrName] = attrValue else: processedAttrs[attrName] = attrValue attrs = ' '.join(f'{k}="{v}"' for k, v in processedAttrs.items()) attrs = f' {attrs}' if attrs else '' # Process text content text = element.text.strip() if element.text and element.text.strip() else '' if text: # Skip if already a placeholder if not self.string_parser._isPlaceholder(text): # Check if text matches any patterns from .subPatterns import findPatternsInText, DataPatterns patternMatches = findPatternsInText(text, DataPatterns.patterns) if patternMatches: patternName = patternMatches[0][0] pattern = next((p for p in DataPatterns.patterns if p.name == patternName), None) if pattern: if text not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) # Create placeholder in format [type.uuid] type_mapping = { 'email': 'email', 'phone': 'phone', 'name': 'name', 'address': 'address', 'id': 'id' } placeholderType = typeMapping.get(patternName, 'data') self.string_parser.mapping[text] = f"[{placeholderType}.{placeholderId}]" text = self.string_parser.mapping[text] else: # Check if text matches any custom names from the user list for name in self.string_parser.NamesToParse: if not name.strip(): continue if text.lower().strip() == name.lower().strip(): if text not in self.string_parser.mapping: # Generate a UUID for the placeholder import uuid placeholder_id = str(uuid.uuid4()) self.string_parser.mapping[text] = f"[name.{placeholder_id}]" text = self.string_parser.mapping[text] break # Process child elements children = [] for child in element: childStr = self._anonymizeXmlElement(child, indent + ' ') children.append(childStr) # Build element string if not children and not text: return f"{indent}<{element.tag}{attrs}/>" elif not children: return f"{indent}<{element.tag}{attrs}>{text}" else: result = [f"{indent}<{element.tag}{attrs}>"] if text: result.append(f"{indent} {text}") result.extend(children) result.append(f"{indent}") return '\n'.join(result) def processXmlContent(self, content: str) -> tuple: """ Process XML content Args: content: XML content to process Returns: Tuple of (processed_data, mapping, replaced_fields, processed_info) """ root = ET.fromstring(content) # Process XML recursively with proper formatting result = self._anonymizeXmlElement(root) processedInfo = {'type': 'xml'} return result, self.string_parser.getMapping(), [], processedInfo def getMapping(self) -> Dict[str, str]: """ Get the current mapping of original values to placeholders Returns: Dict[str, str]: Mapping dictionary """ return self.string_parser.getMapping() def clearMapping(self): """Clear the current mapping""" self.string_parser.clearMapping()