484 lines
No EOL
21 KiB
Python
484 lines
No EOL
21 KiB
Python
"""
|
|
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
|
|
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
|
|
Mehrsprachig: DE, EN, FR, IT
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import pandas as pd
|
|
import docx
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Any, Union, Optional
|
|
from dataclasses import dataclass
|
|
import uuid
|
|
import logging
|
|
import traceback
|
|
import csv
|
|
from datetime import datetime
|
|
import xml.etree.ElementTree as ET
|
|
import os
|
|
import random
|
|
from io import StringIO
|
|
from modules.neutralizer.patterns import Pattern, HeaderPatterns, DataPatterns, get_pattern_for_header, find_patterns_in_text, TextTablePatterns
|
|
import base64
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class TableData:
|
|
"""Repräsentiert Tabellendaten"""
|
|
headers: List[str]
|
|
rows: List[List[str]]
|
|
source_type: str # 'csv', 'json', 'xml', 'text_table'
|
|
|
|
@dataclass
|
|
class PlainText:
|
|
"""Repräsentiert normalen Text"""
|
|
content: str
|
|
source_type: str # 'txt', 'docx', 'text_plain'
|
|
|
|
@dataclass
|
|
class ProcessResult:
|
|
"""Result of content processing"""
|
|
data: Any
|
|
mapping: Dict[str, str]
|
|
replaced_fields: List[str]
|
|
processed_info: Dict[str, Any] # Additional processing information
|
|
|
|
class DataAnonymizer:
|
|
"""Hauptklasse für die Datenanonymisierung"""
|
|
|
|
def __init__(self, names_to_parse: List[str] = None):
|
|
"""Initialize the anonymizer with patterns and custom names
|
|
|
|
Args:
|
|
names_to_parse: List of names to parse and replace (case-insensitive)
|
|
"""
|
|
self.header_patterns = HeaderPatterns.patterns
|
|
self.data_patterns = DataPatterns.patterns
|
|
self.names_to_parse = names_to_parse or []
|
|
self.replaced_fields = set()
|
|
self.mapping = {}
|
|
self.processing_info = []
|
|
|
|
def _normalize_whitespace(self, text: str) -> str:
|
|
"""Normalize whitespace in text"""
|
|
text = re.sub(r'\s+', ' ', text)
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
return text.strip()
|
|
|
|
|
|
def _is_table_line(self, line: str) -> bool:
|
|
"""Check if a line represents a table row"""
|
|
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
|
|
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
|
|
|
|
def _extract_tables_from_text(self, content: str) -> Tuple[List[TableData], List[PlainText]]:
|
|
"""
|
|
Extract tables and plain text from content
|
|
|
|
Args:
|
|
content: Content to process
|
|
|
|
Returns:
|
|
Tuple of (list of tables, list of plain text sections)
|
|
"""
|
|
tables = []
|
|
plain_texts = []
|
|
|
|
# Process the entire content as plain text
|
|
plain_texts.append(PlainText(content=content, source_type='text_plain'))
|
|
|
|
return tables, plain_texts
|
|
|
|
def _anonymize_table(self, table: TableData) -> TableData:
|
|
"""Anonymize table data"""
|
|
try:
|
|
anonymized_table = TableData(
|
|
headers=table.headers.copy(),
|
|
rows=[row.copy() for row in table.rows],
|
|
source_type=table.source_type
|
|
)
|
|
|
|
for i, header in enumerate(anonymized_table.headers):
|
|
pattern = get_pattern_for_header(header, self.header_patterns)
|
|
if pattern:
|
|
for row in anonymized_table.rows:
|
|
if row[i] is not None:
|
|
original = str(row[i])
|
|
if original not in self.mapping:
|
|
self.mapping[original] = pattern.replacement_template.format(len(self.mapping) + 1)
|
|
row[i] = self.mapping[original]
|
|
|
|
return anonymized_table
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error anonymizing table: {str(e)}")
|
|
raise
|
|
|
|
def _anonymize_plain_text(self, text: PlainText) -> PlainText:
|
|
"""Anonymize plain text content using simple search-and-replace approach"""
|
|
try:
|
|
current_text = text.content
|
|
|
|
# Step 1: Replace custom names first (simple regex search-and-replace)
|
|
for name in self.names_to_parse:
|
|
if not name.strip():
|
|
continue
|
|
|
|
# Create case-insensitive regex pattern with word boundaries
|
|
pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
|
|
|
|
# Find all matches for this name
|
|
matches = list(pattern.finditer(current_text))
|
|
|
|
# Replace each match with a placeholder
|
|
for match in reversed(matches): # Process from right to left to avoid position shifts
|
|
matched_text = match.group()
|
|
if matched_text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.mapping[matched_text] = f"[name.{placeholder_id}]"
|
|
|
|
replacement = self.mapping[matched_text]
|
|
start, end = match.span()
|
|
current_text = current_text[:start] + replacement + current_text[end:]
|
|
|
|
# Step 2: Replace pattern-based matches (emails, phones, etc.)
|
|
# Use the same simple approach for patterns
|
|
pattern_matches = find_patterns_in_text(current_text, self.data_patterns)
|
|
|
|
# Process pattern matches from right to left to avoid position shifts
|
|
for pattern_name, matched_text, start, end in reversed(pattern_matches):
|
|
# Skip if already a placeholder
|
|
if re.match(r'\[[a-z]+\.[a-f0-9-]+\]', matched_text):
|
|
continue
|
|
|
|
# Skip if contains placeholder characters
|
|
if '[' in matched_text or ']' in matched_text:
|
|
continue
|
|
|
|
if matched_text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.mapping[matched_text] = f"[{placeholder_type}.{placeholder_id}]"
|
|
|
|
replacement = self.mapping[matched_text]
|
|
current_text = current_text[:start] + replacement + current_text[end:]
|
|
|
|
return PlainText(content=current_text, source_type=text.source_type)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error anonymizing plain text: {str(e)}")
|
|
raise
|
|
|
|
def _anonymize_json_value(self, value: Any, key: str = None) -> Any:
|
|
"""
|
|
Recursively anonymize JSON values based on their keys and content
|
|
|
|
Args:
|
|
value: Value to anonymize
|
|
key: Key name (if part of a key-value pair)
|
|
|
|
Returns:
|
|
Anonymized value
|
|
"""
|
|
if isinstance(value, dict):
|
|
return {k: self._anonymize_json_value(v, k) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [self._anonymize_json_value(item) for item in value]
|
|
elif isinstance(value, str):
|
|
# Check if this is a key we should process
|
|
if key:
|
|
pattern = get_pattern_for_header(key, self.header_patterns)
|
|
if pattern:
|
|
if value not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern.name, 'data')
|
|
self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
return self.mapping[value]
|
|
|
|
# Check if the value itself matches any patterns
|
|
pattern_matches = find_patterns_in_text(value, self.data_patterns)
|
|
custom_name_matches = self._find_custom_names(value)
|
|
|
|
if pattern_matches or custom_name_matches:
|
|
# Use the first match's pattern or custom name
|
|
if pattern_matches:
|
|
pattern_name = pattern_matches[0][0]
|
|
if value not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
elif custom_name_matches:
|
|
if value not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.mapping[value] = f"[name.{placeholder_id}]"
|
|
return self.mapping[value]
|
|
|
|
return value
|
|
else:
|
|
return value
|
|
|
|
def _anonymize_xml_element(self, element: ET.Element, indent: str = '') -> str:
|
|
"""
|
|
Recursively process XML element and return formatted string
|
|
|
|
Args:
|
|
element: XML element to process
|
|
indent: Current indentation level
|
|
|
|
Returns:
|
|
Formatted XML string
|
|
"""
|
|
# Process attributes
|
|
processed_attrs = {}
|
|
for attr_name, attr_value in element.attrib.items():
|
|
# Check if attribute name matches any header patterns
|
|
pattern = get_pattern_for_header(attr_name, self.header_patterns)
|
|
if pattern:
|
|
if attr_value not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern.name, 'data')
|
|
self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
processed_attrs[attr_name] = self.mapping[attr_value]
|
|
else:
|
|
# Check if attribute value matches any data patterns
|
|
matches = find_patterns_in_text(attr_value, self.data_patterns)
|
|
if matches:
|
|
pattern_name = matches[0][0]
|
|
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
|
|
if pattern:
|
|
if attr_value not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
|
|
processed_attrs[attr_name] = self.mapping[attr_value]
|
|
else:
|
|
processed_attrs[attr_name] = attr_value
|
|
else:
|
|
processed_attrs[attr_name] = attr_value
|
|
|
|
attrs = ' '.join(f'{k}="{v}"' for k, v in processed_attrs.items())
|
|
attrs = f' {attrs}' if attrs else ''
|
|
|
|
# Process text content
|
|
text = element.text.strip() if element.text and element.text.strip() else ''
|
|
if text:
|
|
# Check if text matches any patterns or custom names
|
|
pattern_matches = find_patterns_in_text(text, self.data_patterns)
|
|
custom_name_matches = self._find_custom_names(text)
|
|
|
|
if pattern_matches or custom_name_matches:
|
|
if pattern_matches:
|
|
pattern_name = pattern_matches[0][0]
|
|
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
|
|
if pattern:
|
|
if text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'name': 'name',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.mapping[text] = f"[{placeholder_type}.{placeholder_id}]"
|
|
text = self.mapping[text]
|
|
elif custom_name_matches:
|
|
if text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
import uuid
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.mapping[text] = f"[name.{placeholder_id}]"
|
|
text = self.mapping[text]
|
|
|
|
# Process child elements
|
|
children = []
|
|
for child in element:
|
|
child_str = self._anonymize_xml_element(child, indent + ' ')
|
|
children.append(child_str)
|
|
|
|
# Build element string
|
|
if not children and not text:
|
|
return f"{indent}<{element.tag}{attrs}/>"
|
|
elif not children:
|
|
return f"{indent}<{element.tag}{attrs}>{text}</{element.tag}>"
|
|
else:
|
|
result = [f"{indent}<{element.tag}{attrs}>"]
|
|
if text:
|
|
result.append(f"{indent} {text}")
|
|
result.extend(children)
|
|
result.append(f"{indent}</{element.tag}>")
|
|
return '\n'.join(result)
|
|
|
|
def process_content(self, content: str, content_type: str) -> ProcessResult:
|
|
"""
|
|
Process content and return anonymized data
|
|
|
|
Args:
|
|
content: Content to process
|
|
content_type: Type of content ('csv', 'json', 'xml', 'text')
|
|
|
|
Returns:
|
|
ProcessResult: Contains anonymized data, mapping, replaced fields and processing info
|
|
"""
|
|
try:
|
|
|
|
# Check if content is binary data
|
|
is_binary = False
|
|
try:
|
|
# First, check if content looks like base64 (contains only base64 characters)
|
|
if re.match(r'^[A-Za-z0-9+/]*={0,2}$', content.strip()):
|
|
# Try to decode base64 if it looks like base64
|
|
try:
|
|
decoded = base64.b64decode(content)
|
|
# If it's not valid text, consider it binary
|
|
decoded.decode('utf-8')
|
|
is_binary = True
|
|
except (base64.binascii.Error, UnicodeDecodeError):
|
|
is_binary = False
|
|
else:
|
|
is_binary = False
|
|
except Exception as e:
|
|
is_binary = False
|
|
|
|
if is_binary:
|
|
# TODO: Implement binary data neutralization
|
|
# This would require:
|
|
# 1. Detecting binary data types (images, audio, video, etc.)
|
|
# 2. Implementing specific neutralization for each type
|
|
# 3. Handling metadata and embedded content
|
|
# 4. Preserving binary integrity while removing sensitive data
|
|
return ProcessResult(content, self.mapping, [], {'type': 'binary', 'status': 'not_implemented'})
|
|
|
|
replaced_fields = []
|
|
processed_info = {}
|
|
|
|
if content_type in ['csv', 'json', 'xml']:
|
|
# Handle as table
|
|
if content_type == 'csv':
|
|
df = pd.read_csv(StringIO(content), encoding='utf-8')
|
|
table = TableData(
|
|
headers=df.columns.tolist(),
|
|
rows=df.values.tolist(),
|
|
source_type='csv'
|
|
)
|
|
processed_info['type'] = 'table'
|
|
processed_info['headers'] = table.headers
|
|
processed_info['row_count'] = len(table.rows)
|
|
elif content_type == 'json':
|
|
data = json.loads(content)
|
|
# Process JSON recursively
|
|
result = self._anonymize_json_value(data)
|
|
processed_info['type'] = 'json'
|
|
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
|
|
else: # xml
|
|
root = ET.fromstring(content)
|
|
# Process XML recursively with proper formatting
|
|
result = self._anonymize_xml_element(root)
|
|
processed_info['type'] = 'xml'
|
|
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
|
|
|
|
if not table.rows:
|
|
return ProcessResult(None, self.mapping, [], processed_info)
|
|
|
|
anonymized_table = self._anonymize_table(table)
|
|
|
|
# Track replaced fields
|
|
for i, header in enumerate(anonymized_table.headers):
|
|
for orig_row, anon_row in zip(table.rows, anonymized_table.rows):
|
|
if anon_row[i] != orig_row[i]:
|
|
replaced_fields.append(header)
|
|
|
|
# Convert back to original format
|
|
if content_type == 'csv':
|
|
result = pd.DataFrame(anonymized_table.rows, columns=anonymized_table.headers)
|
|
elif content_type == 'json':
|
|
if len(anonymized_table.headers) == 1 and anonymized_table.headers[0] == 'value':
|
|
result = anonymized_table.rows[0][0]
|
|
else:
|
|
result = dict(zip(anonymized_table.headers, anonymized_table.rows[0]))
|
|
else: # xml
|
|
result = ET.tostring(root, encoding='unicode')
|
|
|
|
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
|
|
else:
|
|
# Handle as text
|
|
# First, identify what needs to be replaced using table detection
|
|
tables, plain_texts = self._extract_tables_from_text(content)
|
|
processed_info['type'] = 'text'
|
|
processed_info['tables'] = [{'headers': t.headers, 'row_count': len(t.rows)} for t in tables]
|
|
|
|
# Process plain text sections
|
|
anonymized_texts = [self._anonymize_plain_text(text) for text in plain_texts]
|
|
|
|
# Combine all processed content
|
|
result = content
|
|
for i, (text, anonymized_text) in enumerate(zip(plain_texts, anonymized_texts)):
|
|
if text.content != anonymized_text.content:
|
|
result = result.replace(text.content, anonymized_text.content)
|
|
|
|
return ProcessResult(result, self.mapping, replaced_fields, processed_info)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing content: {str(e)}")
|
|
return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)}) |