279 lines
11 KiB
Python
279 lines
11 KiB
Python
"""
|
|
List processing module for data anonymization
|
|
Handles structured data with headers (CSV, JSON, XML)
|
|
"""
|
|
|
|
import json
|
|
import pandas as pd
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Dict, List, Any, Union
|
|
from dataclasses import dataclass
|
|
from io import StringIO
|
|
from modules.services.serviceNeutralization.subParseString import StringParser
|
|
from modules.services.serviceNeutralization.subPatterns import get_pattern_for_header, HeaderPatterns
|
|
|
|
@dataclass
|
|
class TableData:
|
|
"""Repräsentiert Tabellendaten"""
|
|
headers: List[str]
|
|
rows: List[List[str]]
|
|
source_type: str # 'csv', 'json', 'xml', 'text_table'
|
|
|
|
class ListProcessor:
|
|
"""Handles structured data processing with headers for anonymization"""
|
|
|
|
def __init__(self, NamesToParse: List[str] = None):
|
|
"""
|
|
Initialize the list processor
|
|
|
|
Args:
|
|
NamesToParse: List of names to parse and replace
|
|
"""
|
|
self.string_parser = StringParser(NamesToParse)
|
|
self.header_patterns = HeaderPatterns.patterns
|
|
|
|
def anonymize_table(self, table: TableData) -> TableData:
|
|
"""
|
|
Anonymize table data based on headers
|
|
|
|
Args:
|
|
table: TableData object to anonymize
|
|
|
|
Returns:
|
|
TableData: Anonymized table
|
|
"""
|
|
anonymized_table = TableData(
|
|
headers=table.headers.copy(),
|
|
rows=[row.copy() for row in table.rows],
|
|
source_type=table.source_type
|
|
)
|
|
|
|
for i, header in enumerate(anonymized_table.headers):
|
|
pattern = get_pattern_for_header(header, self.header_patterns)
|
|
if pattern:
|
|
for row in anonymized_table.rows:
|
|
if row[i] is not None:
|
|
original = str(row[i])
|
|
if original not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.string_parser.mapping[original] = pattern.replacement_template.format(len(self.string_parser.mapping) + 1)
|
|
row[i] = self.string_parser.mapping[original]
|
|
|
|
return anonymized_table
|
|
|
|
def process_csv_content(self, content: str) -> tuple:
|
|
"""
|
|
Process CSV content
|
|
|
|
Args:
|
|
content: CSV content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
df = pd.read_csv(StringIO(content), encoding='utf-8')
|
|
table = TableData(
|
|
headers=df.columns.tolist(),
|
|
rows=df.values.tolist(),
|
|
source_type='csv'
|
|
)
|
|
|
|
if not table.rows:
|
|
return None, self.string_parser.get_mapping(), [], {'type': 'table', 'headers': table.headers, 'row_count': 0}
|
|
|
|
anonymized_table = self.anonymize_table(table)
|
|
|
|
# Track replaced fields
|
|
replaced_fields = []
|
|
for i, header in enumerate(anonymized_table.headers):
|
|
for orig_row, anon_row in zip(table.rows, anonymized_table.rows):
|
|
if anon_row[i] != orig_row[i]:
|
|
replaced_fields.append(header)
|
|
|
|
# Convert back to DataFrame
|
|
result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers)
|
|
|
|
processed_info = {
|
|
'type': 'table',
|
|
'headers': table.headers,
|
|
'row_count': len(table.rows)
|
|
}
|
|
|
|
return result, self.string_parser.get_mapping(), replaced_fields, processed_info
|
|
|
|
def process_json_content(self, content: str) -> tuple:
|
|
"""
|
|
Process JSON content
|
|
|
|
Args:
|
|
content: JSON content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
data = json.loads(content)
|
|
|
|
# Process JSON recursively using string parser
|
|
result = self.string_parser.process_json_value(data)
|
|
|
|
processed_info = {'type': 'json'}
|
|
|
|
return result, self.string_parser.get_mapping(), [], processed_info
|
|
|
|
def anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str:
|
|
"""
|
|
Recursively process XML element and return formatted string
|
|
|
|
Args:
|
|
element: XML element to process
|
|
indent: Current indentation level
|
|
|
|
Returns:
|
|
Formatted XML string
|
|
"""
|
|
# Process attributes
|
|
processed_attrs = {}
|
|
for attr_name, attr_value in element.attrib.items():
|
|
# Check if attribute name matches any header patterns
|
|
pattern = get_pattern_for_header(attr_name, self.header_patterns)
|
|
if pattern:
|
|
if attr_value not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern.name, 'data')
|
|
self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
processed_attrs[attr_name] = self.string_parser.mapping[attr_value]
|
|
else:
|
|
# Check if attribute value matches any data patterns
|
|
from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
|
|
matches = find_patterns_in_text(attr_value, DataPatterns.patterns)
|
|
if matches:
|
|
pattern_name = matches[0][0]
|
|
pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None)
|
|
if pattern:
|
|
if attr_value not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
processed_attrs[attr_name] = self.string_parser.mapping[attr_value]
|
|
else:
|
|
processed_attrs[attr_name] = attr_value
|
|
else:
|
|
processed_attrs[attr_name] = attr_value
|
|
|
|
attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items())
|
|
attrs = f' {attrs}' if attrs else ''
|
|
|
|
# Process text content
|
|
text = element.text.strip() if element.text and element.text.strip() else ''
|
|
if text:
|
|
# Skip if already a placeholder
|
|
if not self.string_parser.is_placeholder(text):
|
|
# Check if text matches any patterns
|
|
from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
|
|
pattern_matches = find_patterns_in_text(text, DataPatterns.patterns)
|
|
|
|
if pattern_matches:
|
|
pattern_name = pattern_matches[0][0]
|
|
pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None)
|
|
if pattern:
|
|
if text not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.string_parser.mapping[text] = f"[{placeholder_type}.{placeholder_id}]"
|
|
text = self.string_parser.mapping[text]
|
|
else:
|
|
# Check if text matches any custom names from the user list
|
|
for name in self.string_parser.NamesToParse:
|
|
if not name.strip():
|
|
continue
|
|
if text.lower().strip() == name.lower().strip():
|
|
if text not in self.string_parser.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.string_parser.mapping[text] = f"[name.{placeholder_id}]"
|
|
text = self.string_parser.mapping[text]
|
|
break
|
|
|
|
# Process child elements
|
|
children = []
|
|
for child in element:
|
|
child_str = self.anonymize_xml_element(child, indent + ' ')
|
|
children.append(child_str)
|
|
|
|
# Build element string
|
|
if not children and not text:
|
|
return f"{indent}<{element.tag}{attrs}/>"
|
|
elif not children:
|
|
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
|
|
else:
|
|
result = [f"{indent}<{element.tag}{attrs}>"]
|
|
if text:
|
|
result.append(f"{indent} {text}")
|
|
result.extend(children)
|
|
result.append(f"{indent}</{element.tag}>")
|
|
return '\n'.join(result)
|
|
|
|
def process_xml_content(self, content: str) -> tuple:
|
|
"""
|
|
Process XML content
|
|
|
|
Args:
|
|
content: XML content to process
|
|
|
|
Returns:
|
|
Tuple of (processed_data, mapping, replaced_fields, processed_info)
|
|
"""
|
|
root = ET.fromstring(content)
|
|
|
|
# Process XML recursively with proper formatting
|
|
result = self.anonymize_xml_element(root)
|
|
|
|
processed_info = {'type': 'xml'}
|
|
|
|
return result, self.string_parser.get_mapping(), [], processed_info
|
|
|
|
def get_mapping(self) -> Dict[str, str]:
|
|
"""
|
|
Get the current mapping of original values to placeholders
|
|
|
|
Returns:
|
|
Dict[str, str]: Mapping dictionary
|
|
"""
|
|
return self.string_parser.get_mapping()
|
|
|
|
def clear_mapping(self):
|
|
"""Clear the current mapping"""
|
|
self.string_parser.clear_mapping()
|