gateway/modules/neutralizer/neutralizer.py
2025-06-10 18:19:33 +02:00

368 lines
No EOL
15 KiB
Python

"""
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
Mehrsprachig: DE, EN, FR, IT
"""
import re
import json
import pandas as pd
import docx
from pathlib import Path
from typing import Dict, List, Tuple, Any, Union, Optional
from dataclasses import dataclass
import uuid
import logging
import traceback
import csv
from datetime import datetime
import xml.etree.ElementTree as ET
import os
import random
from io import StringIO
from patterns import Pattern, HeaderPatterns, DataPatterns, get_pattern_for_header, find_patterns_in_text, TextTablePatterns
import base64
# Configure logging
logger = logging.getLogger(__name__)
@dataclass
class TableData:
"""Repräsentiert Tabellendaten"""
headers: List[str]
rows: List[List[str]]
source_type: str # 'csv', 'json', 'xml', 'text_table'
@dataclass
class PlainText:
"""Repräsentiert normalen Text"""
content: str
source_type: str # 'txt', 'docx', 'text_plain'
@dataclass
class ProcessResult:
"""Result of content processing"""
data: Any
mapping: Dict[str, str]
replaced_fields: List[str]
processed_info: Dict[str, Any] # Additional processing information
class DataAnonymizer:
"""Hauptklasse für die Datenanonymisierung"""
def __init__(self):
"""Initialize the anonymizer with patterns"""
self.header_patterns = HeaderPatterns.patterns
self.data_patterns = DataPatterns.patterns
self.replaced_fields = set()
self.mapping = {}
self.processing_info = []
def _normalize_whitespace(self, text: str) -> str:
"""Normalize whitespace in text"""
text = re.sub(r'\s+', ' ', text)
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
def _is_table_line(self, line: str) -> bool:
"""Check if a line represents a table row"""
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
def _extract_tables_from_text(self, content: str) -> Tuple[List[TableData], List[PlainText]]:
"""
Extract tables and plain text from content
Args:
content: Content to process
Returns:
Tuple of (list of tables, list of plain text sections)
"""
tables = []
plain_texts = []
# Process the entire content as plain text
plain_texts.append(PlainText(content=content, source_type='text_plain'))
return tables, plain_texts
def _anonymize_table(self, table: TableData) -> TableData:
"""Anonymize table data"""
try:
anonymized_table = TableData(
headers=table.headers.copy(),
rows=[row.copy() for row in table.rows],
source_type=table.source_type
)
for i, header in enumerate(anonymized_table.headers):
pattern = get_pattern_for_header(header, self.header_patterns)
if pattern:
for row in anonymized_table.rows:
if row[i] is not None:
original = str(row[i])
if original not in self.mapping:
self.mapping[original] = pattern.replacement_template.format(len(self.mapping) + 1)
row[i] = self.mapping[original]
return anonymized_table
except Exception as e:
logger.error(f"Error anonymizing table: {str(e)}")
logger.debug(traceback.format_exc())
raise
def _anonymize_plain_text(self, text: PlainText) -> PlainText:
"""Anonymize plain text content"""
try:
# Process the entire text at once instead of line by line
current_text = text.content
# Find all matches in the entire text
matches = find_patterns_in_text(current_text, self.data_patterns)
# Process matches in reverse order to avoid position shifting
for match in sorted(matches, key=lambda x: x[2], reverse=True):
pattern_name, matched_text, start, end = match
# Skip if the matched text is already a placeholder
if re.match(r'\[[A-Z_]+\d+\]', matched_text):
continue
# Find the pattern that matched
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
# Use the pattern's replacement template
if matched_text not in self.mapping:
self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1)
replacement = self.mapping[matched_text]
if pattern_name == 'email':
print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'")
print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}")
# Replace the matched text while preserving surrounding whitespace
current_text = current_text[:start] + replacement + current_text[end:]
return PlainText(content=current_text, source_type=text.source_type)
except Exception as e:
logger.error(f"Error anonymizing plain text: {str(e)}")
logger.debug(traceback.format_exc())
raise
def _anonymize_json_value(self, value: Any, key: str = None) -> Any:
"""
Recursively anonymize JSON values based on their keys and content
Args:
value: Value to anonymize
key: Key name (if part of a key-value pair)
Returns:
Anonymized value
"""
if isinstance(value, dict):
return {k: self._anonymize_json_value(v, k) for k, v in value.items()}
elif isinstance(value, list):
return [self._anonymize_json_value(item) for item in value]
elif isinstance(value, str):
# Check if this is a key we should process
if key:
pattern = get_pattern_for_header(key, self.header_patterns)
if pattern:
if value not in self.mapping:
self.mapping[value] = pattern.replacement_template.format(len(self.mapping) + 1)
return self.mapping[value]
# Check if the value itself matches any patterns
matches = find_patterns_in_text(value, self.data_patterns)
if matches:
# Use the first match's pattern
pattern_name = matches[0][0]
if value not in self.mapping:
self.mapping[value] = f"{pattern_name.upper()}_{len(self.mapping) + 1}"
return self.mapping[value]
return value
else:
return value
def _anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str:
"""
Recursively process XML element and return formatted string
Args:
element: XML element to process
indent: Current indentation level
Returns:
Formatted XML string
"""
# Process attributes
processed_attrs = {}
for attr_name, attr_value in element.attrib.items():
# Check if attribute name matches any header patterns
pattern = get_pattern_for_header(attr_name, self.header_patterns)
if pattern:
if attr_value not in self.mapping:
self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1)
processed_attrs[attr_name] = self.mapping[attr_value]
else:
# Check if attribute value matches any data patterns
matches = find_patterns_in_text(attr_value, self.data_patterns)
if matches:
pattern_name = matches[0][0]
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
if attr_value not in self.mapping:
self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1)
processed_attrs[attr_name] = self.mapping[attr_value]
else:
processed_attrs[attr_name] = attr_value
else:
processed_attrs[attr_name] = attr_value
attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items())
attrs = f' {attrs}' if attrs else ''
# Process text content
text = element.text.strip() if element.text and element.text.strip() else ''
if text:
# Check if text matches any patterns
matches = find_patterns_in_text(text, self.data_patterns)
if matches:
pattern_name = matches[0][0]
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
if text not in self.mapping:
self.mapping[text] = pattern.replacement_template.format(len(self.mapping) + 1)
text = self.mapping[text]
# Process child elements
children = []
for child in element:
child_str = self._anonymize_xml_element(child, indent + ' ')
children.append(child_str)
# Build element string
if not children and not text:
return f"{indent}<{element.tag}{attrs}/>"
elif not children:
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
else:
result = [f"{indent}<{element.tag}{attrs}>"]
if text:
result.append(f"{indent} {text}")
result.extend(children)
result.append(f"{indent}</{element.tag}>")
return '\n'.join(result)
def process_content(self, content: str, content_type: str) -> ProcessResult:
"""
Process content and return anonymized data
Args:
content: Content to process
content_type: Type of content ('csv', 'json', 'xml', 'text')
Returns:
ProcessResult: Contains anonymized data, mapping, replaced fields and processing info
"""
try:
# Check if content is binary data
is_binary = False
try:
# Try to decode base64 if it's a string
try:
decoded = base64.b64decode(content)
# If it's not valid text, consider it binary
decoded.decode('utf-8')
except (base64.binascii.Error, UnicodeDecodeError):
is_binary = True
except Exception:
is_binary = True
if is_binary:
# TODO: Implement binary data neutralization
# This would require:
# 1. Detecting binary data types (images, audio, video, etc.)
# 2. Implementing specific neutralization for each type
# 3. Handling metadata and embedded content
# 4. Preserving binary integrity while removing sensitive data
return ProcessResult(content, self.mapping, [], {'type': 'binary', 'status': 'not_implemented'})
replaced_fields = []
processed_info = {}
if content_type in ['csv', 'json', 'xml']:
# Handle as table
if content_type == 'csv':
df = pd.read_csv(StringIO(content), encoding='utf-8')
table = TableData(
headers=df.columns.tolist(),
rows=df.values.tolist(),
source_type='csv'
)
processed_info['type'] = 'table'
processed_info['headers'] = table.headers
processed_info['row_count'] = len(table.rows)
elif content_type == 'json':
data = json.loads(content)
# Process JSON recursively
result = self._anonymize_json_value(data)
processed_info['type'] = 'json'
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
else: # xml
root = ET.fromstring(content)
# Process XML recursively with proper formatting
result = self._anonymize_xml_element(root)
processed_info['type'] = 'xml'
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
if not table.rows:
return ProcessResult(None, self.mapping, [], processed_info)
anonymized_table = self._anonymize_table(table)
# Track replaced fields
for i, header in enumerate(anonymized_table.headers):
for orig_row, anon_row in zip(table.rows, anonymized_table.rows):
if anon_row[i] != orig_row[i]:
replaced_fields.append(header)
# Convert back to original format
if content_type == 'csv':
result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers)
elif content_type == 'json':
if len(anonymized_table.headers) == 1 and anonymized_table.headers[0] == 'value':
result = anonymized_table.rows[0][0]
else:
result = dict(zip(anonymized_table.headers, anonymized_table.rows[0]))
else: # xml
result = ET.tostring(root, encoding='unicode')
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
else:
# Handle as text
# First, identify what needs to be replaced using table detection
tables, plain_texts = self._extract_tables_from_text(content)
processed_info['type'] = 'text'
processed_info['tables'] = [{'headers': t.headers, 'row_count': len(t.rows)} for t in tables]
# Process plain text sections
anonymized_texts = [self._anonymize_plain_text(text) for text in plain_texts]
# Combine all processed content
result = content
for text, anonymized_text in zip(plain_texts, anonymized_texts):
if text.content != anonymized_text.content:
result = result.replace(text.content, anonymized_text.content)
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
logger.debug(traceback.format_exc())
return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)})