281 lines
11 KiB
Python
281 lines
11 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
List processing module for data anonymization
|
|
Handles structured data with headers (CSV, JSON, XML)
|
|
"""
|
|
|
|
import json
|
|
import pandas as pd
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Dict, List, Any, Union
|
|
from dataclasses import dataclass
|
|
from io import StringIO
|
|
from .subParseString import StringParser
|
|
from .subPatterns import getPatternForHeader, HeaderPatterns
|
|
|
|
@dataclass
|
|
class TableData:
|
|
"""Repräsentiert Tabellendaten"""
|
|
headers: List[str]
|
|
rows: List[List[str]]
|
|
source_type: str # 'csv', 'json', 'xml', 'text_table'
|
|
|
|
class ListProcessor:
|
|
"""Handles structured data processing with headers for anonymization"""
|
|
|
|
def __init__(self, NamesToParse: List[str] = None):
|
|
"""
|
|
Initialize the list processor
|
|
|
|
Args:
|
|
NamesToParse: List of names to parse and replace
|
|
"""
|
|
self.string_parser = StringParser(NamesToParse)
|
|
self.header_patterns = HeaderPatterns.patterns
|
|
|
|
def _anonymizeTable(self, table: TableData) -> TableData:
|
|
"""
|
|
Anonymize table data based on headers
|
|
|
|
Args:
|
|
table: TableData object to anonymize
|
|
|
|
Returns:
|
|
TableData: Anonymized table
|
|
"""
|
|
anonymizedTable = TableData(
|
|
headers=table.headers.copy(),
|
|
rows=[row.copy() for row in table.rows],
|
|
source_type=table.source_type
|
|
)
|
|
|
|
for i, header in enumerate(anonymizedTable.headers):
|
|
pattern = getPatternForHeader(header, self.header_patterns)
|
|
if pattern:
|
|
for row in anonymizedTable.rows:
|
|
if row[i] is not None:
|
|
original = str(row[i])
|
|
if original not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholderId = str(uuid.uuid4())
|
|
self.string_parser.mapping[original] = pattern.replacement_template.format(len(self.string_parser.mapping) + 1)
|
|
row[i] = self.string_parser.mapping[original]
|
|
|
|
return anonymizedTable
|
|
|
|
def processCsvContent(self, content: str) -> tuple:
|
|
"""
|
|
Process CSV content
|
|
|
|
Args:
|
|
content: CSV content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
df = pd.read_csv(StringIO(content), encoding='utf-8')
|
|
table = TableData(
|
|
headers=df.columns.tolist(),
|
|
rows=df.values.tolist(),
|
|
source_type='csv'
|
|
)
|
|
|
|
if not table.rows:
|
|
return None, self.string_parser.getMapping(), [], {'type': 'table', 'headers': table.headers, 'row_count': 0}
|
|
|
|
anonymizedTable = self._anonymizeTable(table)
|
|
|
|
# Track replaced fields
|
|
replacedFields = []
|
|
for i, header in enumerate(anonymizedTable.headers):
|
|
for origRow, anonRow in zip(table.rows, anonymizedTable.rows):
|
|
if anonRow[i] != origRow[i]:
|
|
replacedFields.append(header)
|
|
|
|
# Convert back to DataFrame
|
|
result = pd.DataFrame(anonymizedTable.rows, columns=anonymizedTable.headers)
|
|
|
|
processedInfo = {
|
|
'type': 'table',
|
|
'headers': table.headers,
|
|
'row_count': len(table.rows)
|
|
}
|
|
|
|
return result, self.string_parser.getMapping(), replacedFields, processedInfo
|
|
|
|
def processJsonContent(self, content: str) -> tuple:
|
|
"""
|
|
Process JSON content
|
|
|
|
Args:
|
|
content: JSON content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
data = json.loads(content)
|
|
|
|
# Process JSON recursively using string parser
|
|
result = self.string_parser.processJsonValue(data)
|
|
|
|
processedInfo = {'type': 'json'}
|
|
|
|
return result, self.string_parser.getMapping(), [], processedInfo
|
|
|
|
def _anonymizeXmlElement(self, element: ET.Element, indent: str = '') -> str:
|
|
"""
|
|
Recursively process XML element and return formatted string
|
|
|
|
Args:
|
|
element: XML element to process
|
|
indent: Current indentation level
|
|
|
|
Returns:
|
|
Formatted XML string
|
|
"""
|
|
# Process attributes
|
|
processedAttrs = {}
|
|
for attrName, attrValue in element.attrib.items():
|
|
# Check if attribute name matches any header patterns
|
|
pattern = getPatternForHeader(attrName, self.header_patterns)
|
|
if pattern:
|
|
if attrValue not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholderId = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
typeMapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholderType = typeMapping.get(pattern.name, 'data')
|
|
self.string_parser.mapping[attrValue] = f"[{placeholderType}.{placeholderId}]"
|
|
processedAttrs[attrName] = self.string_parser.mapping[attrValue]
|
|
else:
|
|
# Check if attribute value matches any data patterns
|
|
from .subPatterns import findPatternsInText, DataPatterns
|
|
matches = findPatternsInText(attrValue, DataPatterns.patterns)
|
|
if matches:
|
|
patternName = matches[0][0]
|
|
pattern = next((p for p in DataPatterns.patterns if p.name == patternName), None)
|
|
if pattern:
|
|
if attrValue not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholderId = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
typeMapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholderType = typeMapping.get(patternName, 'data')
|
|
self.string_parser.mapping[attrValue] = f"[{placeholderType}.{placeholderId}]"
|
|
processedAttrs[attrName] = self.string_parser.mapping[attrValue]
|
|
else:
|
|
processedAttrs[attrName] = attrValue
|
|
else:
|
|
processedAttrs[attrName] = attrValue
|
|
|
|
attrs = ' '.join(f'{k}="{v}"' for k, v in processedAttrs.items())
|
|
attrs = f' {attrs}' if attrs else ''
|
|
|
|
# Process text content
|
|
text = element.text.strip() if element.text and element.text.strip() else ''
|
|
if text:
|
|
# Skip if already a placeholder
|
|
if not self.string_parser._isPlaceholder(text):
|
|
# Check if text matches any patterns
|
|
from .subPatterns import findPatternsInText, DataPatterns
|
|
patternMatches = findPatternsInText(text, DataPatterns.patterns)
|
|
|
|
if patternMatches:
|
|
patternName = patternMatches[0][0]
|
|
pattern = next((p for p in DataPatterns.patterns if p.name == patternName), None)
|
|
if pattern:
|
|
if text not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholderType = typeMapping.get(patternName, 'data')
|
|
self.string_parser.mapping[text] = f"[{placeholderType}.{placeholderId}]"
|
|
text = self.string_parser.mapping[text]
|
|
else:
|
|
# Check if text matches any custom names from the user list
|
|
for name in self.string_parser.NamesToParse:
|
|
if not name.strip():
|
|
continue
|
|
if text.lower().strip() == name.lower().strip():
|
|
if text not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.string_parser.mapping[text] = f"[name.{placeholder_id}]"
|
|
text = self.string_parser.mapping[text]
|
|
break
|
|
|
|
# Process child elements
|
|
children = []
|
|
for child in element:
|
|
childStr = self._anonymizeXmlElement(child, indent + ' ')
|
|
children.append(childStr)
|
|
|
|
# Build element string
|
|
if not children and not text:
|
|
return f"{indent}<{element.tag}{attrs}/>"
|
|
elif not children:
|
|
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
|
|
else:
|
|
result = [f"{indent}<{element.tag}{attrs}>"]
|
|
if text:
|
|
result.append(f"{indent} {text}")
|
|
result.extend(children)
|
|
result.append(f"{indent}</{element.tag}>")
|
|
return '\n'.join(result)
|
|
|
|
def processXmlContent(self, content: str) -> tuple:
|
|
"""
|
|
Process XML content
|
|
|
|
Args:
|
|
content: XML content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
root = ET.fromstring(content)
|
|
|
|
# Process XML recursively with proper formatting
|
|
result = self._anonymizeXmlElement(root)
|
|
|
|
processedInfo = {'type': 'xml'}
|
|
|
|
return result, self.string_parser.getMapping(), [], processedInfo
|
|
|
|
def getMapping(self) -> Dict[str, str]:
|
|
"""
|
|
Get the current mapping of original values to placeholders
|
|
|
|
Returns:
|
|
Dict[str, str]: Mapping dictionary
|
|
"""
|
|
return self.string_parser.getMapping()
|
|
|
|
def clearMapping(self):
|
|
"""Clear the current mapping"""
|
|
self.string_parser.clearMapping()
|