gateway/modules/services/serviceNeutralization/subProcessList.py
2025-09-24 23:18:10 +02:00

279 lines
11 KiB
Python

"""
List processing module for data anonymization
Handles structured data with headers (CSV, JSON, XML)
"""
import json
import pandas as pd
import xml.etree.ElementTree as ET
from typing import Dict, List, Any, Union
from dataclasses import dataclass
from io import StringIO
from modules.services.serviceNeutralization.subParseString import StringParser
from modules.services.serviceNeutralization.subPatterns import get_pattern_for_header, HeaderPatterns
@dataclass
class TableData:
"""Repräsentiert Tabellendaten"""
headers: List[str]
rows: List[List[str]]
source_type: str # 'csv', 'json', 'xml', 'text_table'
class ListProcessor:
"""Handles structured data processing with headers for anonymization"""
def __init__(self, NamesToParse: List[str] = None):
"""
Initialize the list processor
Args:
NamesToParse: List of names to parse and replace
"""
self.string_parser = StringParser(NamesToParse)
self.header_patterns = HeaderPatterns.patterns
def anonymize_table(self, table: TableData) -> TableData:
"""
Anonymize table data based on headers
Args:
table: TableData object to anonymize
Returns:
TableData: Anonymized table
"""
anonymized_table = TableData(
headers=table.headers.copy(),
rows=[row.copy() for row in table.rows],
source_type=table.source_type
)
for i, header in enumerate(anonymized_table.headers):
pattern = get_pattern_for_header(header, self.header_patterns)
if pattern:
for row in anonymized_table.rows:
if row[i] is not None:
original = str(row[i])
if original not in self.string_parser.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
self.string_parser.mapping[original] = pattern.replacement_template.format(len(self.string_parser.mapping) + 1)
row[i] = self.string_parser.mapping[original]
return anonymized_table
def process_csv_content(self, content: str) -> tuple:
"""
Process CSV content
Args:
content: CSV content to process
Returns:
Tuple of (processed_data, mapping, replaced_fields, processed_info)
"""
df = pd.read_csv(StringIO(content), encoding='utf-8')
table = TableData(
headers=df.columns.tolist(),
rows=df.values.tolist(),
source_type='csv'
)
if not table.rows:
return None, self.string_parser.get_mapping(), [], {'type': 'table', 'headers': table.headers, 'row_count': 0}
anonymized_table = self.anonymize_table(table)
# Track replaced fields
replaced_fields = []
for i, header in enumerate(anonymized_table.headers):
for orig_row, anon_row in zip(table.rows, anonymized_table.rows):
if anon_row[i] != orig_row[i]:
replaced_fields.append(header)
# Convert back to DataFrame
result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers)
processed_info = {
'type': 'table',
'headers': table.headers,
'row_count': len(table.rows)
}
return result, self.string_parser.get_mapping(), replaced_fields, processed_info
def process_json_content(self, content: str) -> tuple:
"""
Process JSON content
Args:
content: JSON content to process
Returns:
Tuple of (processed_data, mapping, replaced_fields, processed_info)
"""
data = json.loads(content)
# Process JSON recursively using string parser
result = self.string_parser.process_json_value(data)
processed_info = {'type': 'json'}
return result, self.string_parser.get_mapping(), [], processed_info
def anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str:
"""
Recursively process XML element and return formatted string
Args:
element: XML element to process
indent: Current indentation level
Returns:
Formatted XML string
"""
# Process attributes
processed_attrs = {}
for attr_name, attr_value in element.attrib.items():
# Check if attribute name matches any header patterns
pattern = get_pattern_for_header(attr_name, self.header_patterns)
if pattern:
if attr_value not in self.string_parser.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern.name, 'data')
self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.string_parser.mapping[attr_value]
else:
# Check if attribute value matches any data patterns
from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
matches = find_patterns_in_text(attr_value, DataPatterns.patterns)
if matches:
pattern_name = matches[0][0]
pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None)
if pattern:
if attr_value not in self.string_parser.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.string_parser.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.string_parser.mapping[attr_value]
else:
processed_attrs[attr_name] = attr_value
else:
processed_attrs[attr_name] = attr_value
attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items())
attrs = f' {attrs}' if attrs else ''
# Process text content
text = element.text.strip() if element.text and element.text.strip() else ''
if text:
# Skip if already a placeholder
if not self.string_parser.is_placeholder(text):
# Check if text matches any patterns
from modules.services.serviceNeutralization.subPatterns import find_patterns_in_text, DataPatterns
pattern_matches = find_patterns_in_text(text, DataPatterns.patterns)
if pattern_matches:
pattern_name = pattern_matches[0][0]
pattern = next((p for p in DataPatterns.patterns if p.name == pattern_name), None)
if pattern:
if text not in self.string_parser.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.string_parser.mapping[text] = f"[{placeholder_type}.{placeholder_id}]"
text = self.string_parser.mapping[text]
else:
# Check if text matches any custom names from the user list
for name in self.string_parser.NamesToParse:
if not name.strip():
continue
if text.lower().strip() == name.lower().strip():
if text not in self.string_parser.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
self.string_parser.mapping[text] = f"[name.{placeholder_id}]"
text = self.string_parser.mapping[text]
break
# Process child elements
children = []
for child in element:
child_str = self.anonymize_xml_element(child, indent + ' ')
children.append(child_str)
# Build element string
if not children and not text:
return f"{indent}<{element.tag}{attrs}/>"
elif not children:
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
else:
result = [f"{indent}<{element.tag}{attrs}>"]
if text:
result.append(f"{indent} {text}")
result.extend(children)
result.append(f"{indent}</{element.tag}>")
return '\n'.join(result)
def process_xml_content(self, content: str) -> tuple:
"""
Process XML content
Args:
content: XML content to process
Returns:
Tuple of (processed_data, mapping, replaced_fields, processed_info)
"""
root = ET.fromstring(content)
# Process XML recursively with proper formatting
result = self.anonymize_xml_element(root)
processed_info = {'type': 'xml'}
return result, self.string_parser.get_mapping(), [], processed_info
def get_mapping(self) -> Dict[str, str]:
"""
Get the current mapping of original values to placeholders
Returns:
Dict[str, str]: Mapping dictionary
"""
return self.string_parser.get_mapping()
def clear_mapping(self):
"""Clear the current mapping"""
self.string_parser.clear_mapping()