198 lines
7 KiB
Python
198 lines
7 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
"""
|
||
String parsing and replacement utilities for data anonymization
|
||
Handles pattern matching and replacement for emails, phones, addresses, IDs and names
|
||
"""
|
||
|
||
import re
|
||
import uuid
|
||
from typing import Dict, List, Tuple, Any
|
||
from .subPatterns import DataPatterns, findPatternsInText
|
||
|
||
# Phrases or words that must never be neutralized (labels, Anrede, etc.)
|
||
_NEUTRALIZATION_BLACKLIST = frozenset({
|
||
"Für Sie", "Ihre Ansprechperson", "AXA 24", "General Agent",
|
||
"Your Contact", "Contact Person", "Bei Fragen", "Mit Freundlichen",
|
||
"Frau", "Herr", # Anrede
|
||
"Reise", "Reisebeginn", "Reiseende", "Vertragsbeginn", "Zahlbar",
|
||
"Versicherte", "Versicherungsnehmer", "Versicherung", "Insurance",
|
||
"Leistungen", "Basis", "Benefits", # Section labels
|
||
"Start", "Beginn", "Ende", "End", "trip", # Contract labels (Start of trip, End of trip, etc.)
|
||
})
|
||
|
||
|
||
class StringParser:
|
||
"""Handles string parsing and replacement operations"""
|
||
|
||
def __init__(self, NamesToParse: List[str] = None):
|
||
"""
|
||
Initialize the string parser
|
||
|
||
Args:
|
||
NamesToParse: List of names to parse and replace (case-insensitive)
|
||
"""
|
||
self.data_patterns = DataPatterns.patterns
|
||
self.NamesToParse = NamesToParse or []
|
||
self.mapping = {}
|
||
|
||
def _isPlaceholder(self, text: str) -> bool:
|
||
"""
|
||
Check if text is already a placeholder in format [tag.uuid]
|
||
|
||
Args:
|
||
text: Text to check
|
||
|
||
Returns:
|
||
bool: True if text is a placeholder
|
||
"""
|
||
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', text))
|
||
|
||
def _replacePatternMatches(self, text: str) -> str:
|
||
"""
|
||
Replace pattern-based matches (emails, phones, etc.) in text
|
||
|
||
Args:
|
||
text: Text to process
|
||
|
||
Returns:
|
||
str: Text with pattern matches replaced
|
||
"""
|
||
patternMatches = findPatternsInText(text, self.data_patterns)
|
||
|
||
# Exclude matches that are fully contained in a longer match (e.g. skip "2026" inside "17.02.2026")
|
||
def is_contained(m, all_matches):
|
||
for other in all_matches:
|
||
if other is m:
|
||
continue
|
||
if other[2] <= m[2] and m[3] <= other[3] and (other[3] - other[2]) > (m[3] - m[2]):
|
||
return True
|
||
return False
|
||
patternMatches = [m for m in patternMatches if not is_contained(m, patternMatches)]
|
||
|
||
# Process from right to left to avoid position shifts
|
||
for patternName, matchedText, start, end in reversed(patternMatches):
|
||
# Skip if already a placeholder
|
||
if self._isPlaceholder(matchedText):
|
||
continue
|
||
|
||
# Skip if contains placeholder characters
|
||
if '[' in matchedText or ']' in matchedText:
|
||
continue
|
||
|
||
# Skip blacklisted text (labels, Anrede, etc.) – never neutralize
|
||
if matchedText.strip() in _NEUTRALIZATION_BLACKLIST:
|
||
continue
|
||
# Skip if match contains any blacklisted word (e.g. "2026 Reise" or "2026 Reisebeginn" from address pattern)
|
||
if any(w in _NEUTRALIZATION_BLACKLIST for w in matchedText.split()):
|
||
continue
|
||
|
||
if matchedText not in self.mapping:
|
||
# Generate a UUID for the placeholder
|
||
placeholderId = str(uuid.uuid4())
|
||
# Create placeholder in format [type.uuid]
|
||
typeMapping = {
|
||
'email': 'email',
|
||
'phone': 'phone',
|
||
'address': 'address',
|
||
'date': 'date',
|
||
'policy': 'policy',
|
||
'name': 'name',
|
||
'id': 'id',
|
||
'iban': 'iban',
|
||
'ssn': 'ssn',
|
||
}
|
||
placeholderType = typeMapping.get(patternName, 'data')
|
||
self.mapping[matchedText] = f"[{placeholderType}.{placeholderId}]"
|
||
|
||
replacement = self.mapping[matchedText]
|
||
text = text[:start] + replacement + text[end:]
|
||
|
||
return text
|
||
|
||
def _replaceCustomNames(self, text: str) -> str:
|
||
"""
|
||
Replace custom names from the user list in text
|
||
|
||
Args:
|
||
text: Text to process
|
||
|
||
Returns:
|
||
str: Text with custom names replaced
|
||
"""
|
||
for name in self.NamesToParse:
|
||
if not name.strip():
|
||
continue
|
||
|
||
# Create case-insensitive regex pattern with word boundaries
|
||
pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
|
||
|
||
# Find all matches for this name
|
||
matches = list(pattern.finditer(text))
|
||
|
||
# Replace each match with a placeholder
|
||
for match in reversed(matches): # Process from right to left to avoid position shifts
|
||
matchedText = match.group()
|
||
if matchedText not in self.mapping:
|
||
# Generate a UUID for the placeholder
|
||
placeholderId = str(uuid.uuid4())
|
||
self.mapping[matchedText] = f"[name.{placeholderId}]"
|
||
|
||
replacement = self.mapping[matchedText]
|
||
start, end = match.span()
|
||
text = text[:start] + replacement + text[end:]
|
||
|
||
return text
|
||
|
||
def processString(self, text: str) -> str:
|
||
"""
|
||
Process a string by replacing patterns first, then custom names
|
||
|
||
Args:
|
||
text: Text to process
|
||
|
||
Returns:
|
||
str: Processed text with replacements
|
||
"""
|
||
if self._isPlaceholder(text):
|
||
return text
|
||
|
||
# Step 1: Replace pattern-based matches FIRST
|
||
text = self._replacePatternMatches(text)
|
||
|
||
# Step 2: Replace custom names SECOND
|
||
text = self._replaceCustomNames(text)
|
||
|
||
return text
|
||
|
||
def processJsonValue(self, value: Any) -> Any:
|
||
"""
|
||
Process a JSON value for anonymization
|
||
|
||
Args:
|
||
value: Value to process
|
||
|
||
Returns:
|
||
Any: Processed value
|
||
"""
|
||
if isinstance(value, str):
|
||
return self.processString(value)
|
||
elif isinstance(value, dict):
|
||
return {k: self.processJsonValue(v) for k, v in value.items()}
|
||
elif isinstance(value, list):
|
||
return [self.processJsonValue(item) for item in value]
|
||
else:
|
||
return value
|
||
|
||
def getMapping(self) -> Dict[str, str]:
|
||
"""
|
||
Get the current mapping of original values to placeholders
|
||
|
||
Returns:
|
||
Dict[str, str]: Mapping dictionary
|
||
"""
|
||
return self.mapping.copy()
|
||
|
||
def clearMapping(self):
|
||
"""Clear the current mapping"""
|
||
self.mapping.clear()
|