gateway/modules/neutralizer/neutralizer.py
2025-09-17 02:12:34 +02:00

484 lines
No EOL
21 KiB
Python

"""
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
Mehrsprachig: DE, EN, FR, IT
"""
import re
import json
import pandas as pd
import docx
from pathlib import Path
from typing import Dict, List, Tuple, Any, Union, Optional
from dataclasses import dataclass
import uuid
import logging
import traceback
import csv
from datetime import datetime
import xml.etree.ElementTree as ET
import os
import random
from io import StringIO
from modules.neutralizer.patterns import Pattern, HeaderPatterns, DataPatterns, get_pattern_for_header, find_patterns_in_text, TextTablePatterns
import base64
# Configure logging
logger = logging.getLogger(__name__)
@dataclass
class TableData:
"""Repräsentiert Tabellendaten"""
headers: List[str]
rows: List[List[str]]
source_type: str # 'csv', 'json', 'xml', 'text_table'
@dataclass
class PlainText:
"""Repräsentiert normalen Text"""
content: str
source_type: str # 'txt', 'docx', 'text_plain'
@dataclass
class ProcessResult:
"""Result of content processing"""
data: Any
mapping: Dict[str, str]
replaced_fields: List[str]
processed_info: Dict[str, Any] # Additional processing information
class DataAnonymizer:
"""Hauptklasse für die Datenanonymisierung"""
def __init__(self, names_to_parse: List[str] = None):
"""Initialize the anonymizer with patterns and custom names
Args:
names_to_parse: List of names to parse and replace (case-insensitive)
"""
self.header_patterns = HeaderPatterns.patterns
self.data_patterns = DataPatterns.patterns
self.names_to_parse = names_to_parse or []
self.replaced_fields = set()
self.mapping = {}
self.processing_info = []
def _normalize_whitespace(self, text: str) -> str:
"""Normalize whitespace in text"""
text = re.sub(r'\s+', ' ', text)
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
def _is_table_line(self, line: str) -> bool:
"""Check if a line represents a table row"""
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
def _extract_tables_from_text(self, content: str) -> Tuple[List[TableData], List[PlainText]]:
"""
Extract tables and plain text from content
Args:
content: Content to process
Returns:
Tuple of (list of tables, list of plain text sections)
"""
tables = []
plain_texts = []
# Process the entire content as plain text
plain_texts.append(PlainText(content=content, source_type='text_plain'))
return tables, plain_texts
def _anonymize_table(self, table: TableData) -> TableData:
"""Anonymize table data"""
try:
anonymized_table = TableData(
headers=table.headers.copy(),
rows=[row.copy() for row in table.rows],
source_type=table.source_type
)
for i, header in enumerate(anonymized_table.headers):
pattern = get_pattern_for_header(header, self.header_patterns)
if pattern:
for row in anonymized_table.rows:
if row[i] is not None:
original = str(row[i])
if original not in self.mapping:
self.mapping[original] = pattern.replacement_template.format(len(self.mapping) + 1)
row[i] = self.mapping[original]
return anonymized_table
except Exception as e:
logger.error(f"Error anonymizing table: {str(e)}")
raise
def _anonymize_plain_text(self, text: PlainText) -> PlainText:
"""Anonymize plain text content using simple search-and-replace approach"""
try:
current_text = text.content
# Step 1: Replace custom names first (simple regex search-and-replace)
for name in self.names_to_parse:
if not name.strip():
continue
# Create case-insensitive regex pattern with word boundaries
pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
# Find all matches for this name
matches = list(pattern.finditer(current_text))
# Replace each match with a placeholder
for match in reversed(matches): # Process from right to left to avoid position shifts
matched_text = match.group()
if matched_text not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
self.mapping[matched_text] = f"[name.{placeholder_id}]"
replacement = self.mapping[matched_text]
start, end = match.span()
current_text = current_text[:start] + replacement + current_text[end:]
# Step 2: Replace pattern-based matches (emails, phones, etc.)
# Use the same simple approach for patterns
pattern_matches = find_patterns_in_text(current_text, self.data_patterns)
# Process pattern matches from right to left to avoid position shifts
for pattern_name, matched_text, start, end in reversed(pattern_matches):
# Skip if already a placeholder
if re.match(r'\[[a-z]+\.[a-f0-9-]+\]', matched_text):
continue
# Skip if contains placeholder characters
if '[' in matched_text or ']' in matched_text:
continue
if matched_text not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.mapping[matched_text] = f"[{placeholder_type}.{placeholder_id}]"
replacement = self.mapping[matched_text]
current_text = current_text[:start] + replacement + current_text[end:]
return PlainText(content=current_text, source_type=text.source_type)
except Exception as e:
logger.error(f"Error anonymizing plain text: {str(e)}")
raise
def _anonymize_json_value(self, value: Any, key: str = None) -> Any:
"""
Recursively anonymize JSON values based on their keys and content
Args:
value: Value to anonymize
key: Key name (if part of a key-value pair)
Returns:
Anonymized value
"""
if isinstance(value, dict):
return {k: self._anonymize_json_value(v, k) for k, v in value.items()}
elif isinstance(value, list):
return [self._anonymize_json_value(item) for item in value]
elif isinstance(value, str):
# Check if this is a key we should process
if key:
pattern = get_pattern_for_header(key, self.header_patterns)
if pattern:
if value not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern.name, 'data')
self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
return self.mapping[value]
# Check if the value itself matches any patterns
pattern_matches = find_patterns_in_text(value, self.data_patterns)
custom_name_matches = self._find_custom_names(value)
if pattern_matches or custom_name_matches:
# Use the first match's pattern or custom name
if pattern_matches:
pattern_name = pattern_matches[0][0]
if value not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
elif custom_name_matches:
if value not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
self.mapping[value] = f"[name.{placeholder_id}]"
return self.mapping[value]
return value
else:
return value
def _anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str:
"""
Recursively process XML element and return formatted string
Args:
element: XML element to process
indent: Current indentation level
Returns:
Formatted XML string
"""
# Process attributes
processed_attrs = {}
for attr_name, attr_value in element.attrib.items():
# Check if attribute name matches any header patterns
pattern = get_pattern_for_header(attr_name, self.header_patterns)
if pattern:
if attr_value not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern.name, 'data')
self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.mapping[attr_value]
else:
# Check if attribute value matches any data patterns
matches = find_patterns_in_text(attr_value, self.data_patterns)
if matches:
pattern_name = matches[0][0]
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
if attr_value not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.mapping[attr_value]
else:
processed_attrs[attr_name] = attr_value
else:
processed_attrs[attr_name] = attr_value
attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items())
attrs = f' {attrs}' if attrs else ''
# Process text content
text = element.text.strip() if element.text and element.text.strip() else ''
if text:
# Check if text matches any patterns or custom names
pattern_matches = find_patterns_in_text(text, self.data_patterns)
custom_name_matches = self._find_custom_names(text)
if pattern_matches or custom_name_matches:
if pattern_matches:
pattern_name = pattern_matches[0][0]
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
if text not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
type_mapping = {
'email': 'email',
'phone': 'phone',
'name': 'name',
'address': 'address',
'id': 'id'
}
placeholder_type = type_mapping.get(pattern_name, 'data')
self.mapping[text] = f"[{placeholder_type}.{placeholder_id}]"
text = self.mapping[text]
elif custom_name_matches:
if text not in self.mapping:
# Generate a UUID for the placeholder
import uuid
placeholder_id = str(uuid.uuid4())
self.mapping[text] = f"[name.{placeholder_id}]"
text = self.mapping[text]
# Process child elements
children = []
for child in element:
child_str = self._anonymize_xml_element(child, indent + ' ')
children.append(child_str)
# Build element string
if not children and not text:
return f"{indent}<{element.tag}{attrs}/>"
elif not children:
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
else:
result = [f"{indent}<{element.tag}{attrs}>"]
if text:
result.append(f"{indent} {text}")
result.extend(children)
result.append(f"{indent}</{element.tag}>")
return '\n'.join(result)
def process_content(self, content: str, content_type: str) -> ProcessResult:
"""
Process content and return anonymized data
Args:
content: Content to process
content_type: Type of content ('csv', 'json', 'xml', 'text')
Returns:
ProcessResult: Contains anonymized data, mapping, replaced fields and processing info
"""
try:
# Check if content is binary data
is_binary = False
try:
# First, check if content looks like base64 (contains only base64 characters)
if re.match(r'^[A-Za-z0-9+/]*={0,2}$', content.strip()):
# Try to decode base64 if it looks like base64
try:
decoded = base64.b64decode(content)
# If it's not valid text, consider it binary
decoded.decode('utf-8')
is_binary = True
except (base64.binascii.Error, UnicodeDecodeError):
is_binary = False
else:
is_binary = False
except Exception as e:
is_binary = False
if is_binary:
# TODO: Implement binary data neutralization
# This would require:
# 1. Detecting binary data types (images, audio, video, etc.)
# 2. Implementing specific neutralization for each type
# 3. Handling metadata and embedded content
# 4. Preserving binary integrity while removing sensitive data
return ProcessResult(content, self.mapping, [], {'type': 'binary', 'status': 'not_implemented'})
replaced_fields = []
processed_info = {}
if content_type in ['csv', 'json', 'xml']:
# Handle as table
if content_type == 'csv':
df = pd.read_csv(StringIO(content), encoding='utf-8')
table = TableData(
headers=df.columns.tolist(),
rows=df.values.tolist(),
source_type='csv'
)
processed_info['type'] = 'table'
processed_info['headers'] = table.headers
processed_info['row_count'] = len(table.rows)
elif content_type == 'json':
data = json.loads(content)
# Process JSON recursively
result = self._anonymize_json_value(data)
processed_info['type'] = 'json'
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
else: # xml
root = ET.fromstring(content)
# Process XML recursively with proper formatting
result = self._anonymize_xml_element(root)
processed_info['type'] = 'xml'
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
if not table.rows:
return ProcessResult(None, self.mapping, [], processed_info)
anonymized_table = self._anonymize_table(table)
# Track replaced fields
for i, header in enumerate(anonymized_table.headers):
for orig_row, anon_row in zip(table.rows, anonymized_table.rows):
if anon_row[i] != orig_row[i]:
replaced_fields.append(header)
# Convert back to original format
if content_type == 'csv':
result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers)
elif content_type == 'json':
if len(anonymized_table.headers) == 1 and anonymized_table.headers[0] == 'value':
result = anonymized_table.rows[0][0]
else:
result = dict(zip(anonymized_table.headers, anonymized_table.rows[0]))
else: # xml
result = ET.tostring(root, encoding='unicode')
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
else:
# Handle as text
# First, identify what needs to be replaced using table detection
tables, plain_texts = self._extract_tables_from_text(content)
processed_info['type'] = 'text'
processed_info['tables'] = [{'headers': t.headers, 'row_count': len(t.rows)} for t in tables]
# Process plain text sections
anonymized_texts = [self._anonymize_plain_text(text) for text in plain_texts]
# Combine all processed content
result = content
for i, (text, anonymized_text) in enumerate(zip(plain_texts, anonymized_texts)):
if text.content != anonymized_text.content:
result = result.replace(text.content, anonymized_text.content)
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)})