gateway/modules/neutralizer/subProcessText.py
2025-09-22 00:39:15 +02:00

101 lines
3.3 KiB
Python

"""
Text processing module for data anonymization
Handles plain text processing without header information
"""
from typing import Dict, List, Any
from dataclasses import dataclass
from modules.neutralizer.subParseString import StringParser
@dataclass
class PlainText:
"""Repräsentiert normalen Text"""
content: str
source_type: str # 'txt', 'docx', 'text_plain'
class TextProcessor:
"""Handles plain text processing for anonymization"""
def __init__(self, names_to_parse: List[str] = None):
"""
Initialize the text processor
Args:
names_to_parse: List of names to parse and replace
"""
self.string_parser = StringParser(names_to_parse)
def extract_tables_from_text(self, content: str) -> tuple:
"""
Extract tables and plain text from content
Args:
content: Content to process
Returns:
Tuple of (list of tables, list of plain text sections)
"""
# For now, process the entire content as plain text
# This can be extended later to detect table-like structures
tables = []
plain_texts = [PlainText(content=content, source_type='text_plain')]
return tables, plain_texts
def anonymize_plain_text(self, text: PlainText) -> PlainText:
"""
Anonymize plain text content
Args:
text: PlainText object to anonymize
Returns:
PlainText: Anonymized text
"""
# Use the string parser to process the content
anonymized_content = self.string_parser.process_string(text.content)
return PlainText(content=anonymized_content, source_type=text.source_type)
def process_text_content(self, content: str) -> tuple:
"""
Process text content and return anonymized data
Args:
content: Text content to process
Returns:
Tuple of (anonymized_content, mapping, replaced_fields, processed_info)
"""
# Extract tables and plain text sections
tables, plain_texts = self.extract_tables_from_text(content)
# Process plain text sections
anonymized_texts = [self.anonymize_plain_text(text) for text in plain_texts]
# Combine all processed content
result = content
for text, anonymized_text in zip(plain_texts, anonymized_texts):
if text.content != anonymized_text.content:
result = result.replace(text.content, anonymized_text.content)
# Get processing information
processed_info = {
'type': 'text',
'tables': [{'headers': t.headers, 'row_count': len(t.rows)} for t in tables] if hasattr(tables[0], 'headers') else []
}
return result, self.string_parser.get_mapping(), [], processed_info
def get_mapping(self) -> Dict[str, str]:
"""
Get the current mapping of original values to placeholders
Returns:
Dict[str, str]: Mapping dictionary
"""
return self.string_parser.get_mapping()
def clear_mapping(self):
"""Clear the current mapping"""
self.string_parser.clear_mapping()