296 lines
12 KiB
Python
296 lines
12 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
"""
|
||
String parsing and replacement utilities for data anonymization
|
||
Handles pattern matching and replacement for emails, phones, addresses, IDs and names
|
||
"""
|
||
|
||
import re
|
||
import uuid
|
||
from typing import Dict, List, Tuple, Any
|
||
from .subPatterns import DataPatterns, findPatternsInText
|
||
|
||
# Phrases or words that must never be neutralized (labels, Anrede, etc.)
|
||
_NEUTRALIZATION_BLACKLIST = frozenset({
|
||
"Für Sie", "Ihre Ansprechperson", "AXA 24", "General Agent",
|
||
"Your Contact", "Contact Person", "Bei Fragen", "Mit Freundlichen",
|
||
"Frau", "Herr", # Anrede
|
||
"Reise", "Reisebeginn", "Reiseende", "Vertragsbeginn", "Zahlbar",
|
||
"Versicherte", "Versicherungsnehmer", "Versicherung", "Insurance",
|
||
"Leistungen", "Basis", "Benefits", # Section labels
|
||
"Start", "Beginn", "Ende", "End", "trip", # Contract labels (Start of trip, End of trip, etc.)
|
||
"incomplete", "Application", "Complete", "Pending", # Form/status labels, not addresses
|
||
"Marketing", "Verkaufsstrategien", "Qualitätsmanagement", # Business terms, not addresses
|
||
"Ausbildungsstätte", "Realschule", # Institution types, not city names
|
||
# Ambiguous substrings – match in Zurich, CHF, UID-Nr., websites, etc.
|
||
"CH", "DE", "FR", "IT", "Nr", "Nr.", "Nr:", "No", "No.", "No:",
|
||
"www", ".ch", ".com", ".org", ".net", "CHF",
|
||
# Labels that must never be neutralized
|
||
"Kontakt", "Kanzlei", "Telefon", "Matrikel-Nr", "Matrikel-Nr.", "Student ID", "Student-ID",
|
||
})
|
||
|
||
|
||
class StringParser:
|
||
"""Handles string parsing and replacement operations"""
|
||
|
||
def __init__(self, NamesToParse: List[str] = None):
|
||
"""
|
||
Initialize the string parser
|
||
|
||
Args:
|
||
NamesToParse: List of names to parse and replace (case-insensitive)
|
||
"""
|
||
self.data_patterns = DataPatterns.patterns
|
||
self.NamesToParse = NamesToParse or []
|
||
self.mapping = {}
|
||
|
||
def _isPlaceholder(self, text: str) -> bool:
|
||
"""
|
||
Check if text is already a placeholder in format [tag.uuid]
|
||
|
||
Args:
|
||
text: Text to check
|
||
|
||
Returns:
|
||
bool: True if text is a placeholder
|
||
"""
|
||
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', text))
|
||
|
||
def _replacePatternMatches(self, text: str) -> str:
|
||
"""
|
||
Replace pattern-based matches (emails, phones, etc.) in text
|
||
|
||
Args:
|
||
text: Text to process
|
||
|
||
Returns:
|
||
str: Text with pattern matches replaced
|
||
"""
|
||
patternMatches = findPatternsInText(text, self.data_patterns)
|
||
|
||
# Exclude matches that are fully contained in a longer match (e.g. skip "2026" inside "17.02.2026")
|
||
def is_contained(m, all_matches):
|
||
for other in all_matches:
|
||
if other is m:
|
||
continue
|
||
if other[2] <= m[2] and m[3] <= other[3] and (other[3] - other[2]) > (m[3] - m[2]):
|
||
return True
|
||
return False
|
||
patternMatches = [m for m in patternMatches if not is_contained(m, patternMatches)]
|
||
|
||
# Deduplicate: keep one match per (start,end) – same span can match multiple patterns
|
||
seen = set()
|
||
unique_matches = []
|
||
for m in patternMatches:
|
||
key = (m[2], m[3])
|
||
if key not in seen:
|
||
seen.add(key)
|
||
unique_matches.append(m)
|
||
patternMatches = unique_matches
|
||
|
||
# Exclude address matches that overlap with date matches (e.g. "2026 den" overlaps "17.02.2026")
|
||
def overlaps(a_start, a_end, b_start, b_end):
|
||
return a_start < b_end and b_start < a_end
|
||
date_ranges = [(m[2], m[3]) for m in patternMatches if m[0] == "date"]
|
||
patternMatches = [
|
||
m for m in patternMatches
|
||
if not (m[0] == "address" and any(overlaps(m[2], m[3], ds, de) for ds, de in date_ranges))
|
||
]
|
||
|
||
# For name matches: resolve overlaps – keep only longest to avoid multiple placeholders for one name
|
||
# (e.g. "Ida", "Dittrich", "Ida Dittrich" → keep only "Ida Dittrich" with one UUID)
|
||
name_matches = [(m, m[3] - m[2]) for m in patternMatches if m[0] == "name"]
|
||
name_spans = [(m[2], m[3]) for m, _ in name_matches]
|
||
patternMatches = [
|
||
m for m in patternMatches
|
||
if m[0] != "name"
|
||
or not any(
|
||
overlaps(m[2], m[3], ns, ne) and (ne - ns) > (m[3] - m[2])
|
||
for (ns, ne) in name_spans
|
||
)
|
||
]
|
||
|
||
# Process from right to left to avoid position shifts
|
||
for patternName, matchedText, start, end in reversed(patternMatches):
|
||
# Skip if already a placeholder
|
||
if self._isPlaceholder(matchedText):
|
||
continue
|
||
|
||
# Skip if contains placeholder characters
|
||
if '[' in matchedText or ']' in matchedText:
|
||
continue
|
||
|
||
# Skip blacklisted text (labels, Anrede, etc.) – never neutralize
|
||
if matchedText.strip() in _NEUTRALIZATION_BLACKLIST:
|
||
continue
|
||
# Skip if match contains any blacklisted word (e.g. "2026 Reise" or "2026 Reisebeginn" from address pattern)
|
||
if any(w in _NEUTRALIZATION_BLACKLIST for w in matchedText.split()):
|
||
continue
|
||
# Skip phone matches that are clearly part of a price (e.g. 128 in 128.56 CHF)
|
||
if patternName == "phone" and end + 3 <= len(text):
|
||
after = text[end : end + 3]
|
||
if (after[0] in ".," and len(after) >= 3 and after[1:3].isdigit()):
|
||
continue
|
||
|
||
if matchedText not in self.mapping:
|
||
# Generate a UUID for the placeholder
|
||
placeholderId = str(uuid.uuid4())
|
||
# Create placeholder in format [type.uuid]
|
||
typeMapping = {
|
||
'email': 'email',
|
||
'phone': 'phone',
|
||
'address': 'address',
|
||
'date': 'date',
|
||
'policy': 'policy',
|
||
'name': 'name',
|
||
'id': 'id',
|
||
'iban': 'iban',
|
||
'ssn': 'ssn',
|
||
}
|
||
placeholderType = typeMapping.get(patternName, 'data')
|
||
self.mapping[matchedText] = f"[{placeholderType}.{placeholderId}]"
|
||
|
||
replacement = self.mapping[matchedText]
|
||
text = text[:start] + replacement + text[end:]
|
||
|
||
return text
|
||
|
||
def _replaceCustomNames(self, text: str) -> str:
|
||
"""
|
||
Replace custom names from the user list in text.
|
||
Builds composite names (e.g. "Ida Dittrich") so full names get one UUID, not one per word.
|
||
"""
|
||
names = [n.strip() for n in self.NamesToParse if n.strip()]
|
||
if not names:
|
||
return text
|
||
|
||
# Add composite names: "Ida Dittrich", "Dittrich Ida" when both are in list
|
||
expanded = set(names)
|
||
for i, n1 in enumerate(names):
|
||
for n2 in names:
|
||
if n1 != n2:
|
||
expanded.add(f"{n1} {n2}")
|
||
expanded.add(f"{n2} {n1}")
|
||
|
||
# One UUID per person: composites and their parts share same UUID
|
||
# Also align with DataPatterns mapping (step 1 may have already replaced "Ida Dittrich")
|
||
name_to_uuid: Dict[str, str] = {}
|
||
for composite in sorted(expanded, key=len, reverse=True):
|
||
if " " not in composite:
|
||
continue
|
||
parts = composite.split()
|
||
parts_set = frozenset(parts)
|
||
existing_uuid = next((name_to_uuid[p] for p in parts_set if p in name_to_uuid), None)
|
||
if existing_uuid is None:
|
||
existing_uuid = next(
|
||
(self.mapping[k] for k in (composite, *parts_set) if k in self.mapping),
|
||
None
|
||
)
|
||
if existing_uuid is None:
|
||
existing_uuid = f"[name.{uuid.uuid4()}]"
|
||
for p in parts_set:
|
||
name_to_uuid[p] = existing_uuid
|
||
name_to_uuid[composite] = existing_uuid
|
||
if len(parts) == 2:
|
||
name_to_uuid[f"{parts[1]} {parts[0]}"] = existing_uuid
|
||
for n in names:
|
||
if n not in name_to_uuid:
|
||
name_to_uuid[n] = self.mapping.get(n) or f"[name.{uuid.uuid4()}]"
|
||
self.mapping.update({k: v for k, v in name_to_uuid.items() if k not in self.mapping})
|
||
|
||
# Collect ALL matches from all name patterns, then keep only longest per span to avoid
|
||
# triple replacement ("Ida" + "Dittrich" + "Ida Dittrich" -> only "Ida Dittrich")
|
||
all_matches: List[Tuple[str, int, int]] = []
|
||
for name in sorted(expanded, key=len, reverse=True):
|
||
if " " in name:
|
||
parts = name.split()
|
||
pattern_str = r"\b" + r"\s+".join(re.escape(p) for p in parts) + r"\b"
|
||
else:
|
||
pattern_str = r"\b" + re.escape(name) + r"\b"
|
||
pattern = re.compile(pattern_str, re.IGNORECASE)
|
||
for m in pattern.finditer(text):
|
||
all_matches.append((m.group(), m.start(), m.end()))
|
||
|
||
# Remove matches that overlap with a longer match (keep longest per span)
|
||
def _overlaps(s1, e1, s2, e2):
|
||
return s1 < e2 and s2 < e1
|
||
|
||
def _contained_in_longer(matched_text: str, start: int, end: int) -> bool:
|
||
for other_text, os, oe in all_matches:
|
||
if (os, oe) == (start, end):
|
||
continue
|
||
if _overlaps(start, end, os, oe) and (oe - os) > (end - start):
|
||
return True
|
||
return False
|
||
|
||
to_replace = [(t, s, e) for t, s, e in all_matches if not _contained_in_longer(t, s, e)]
|
||
to_replace = list({(s, e): (t, s, e) for t, s, e in to_replace}.values())
|
||
|
||
# Replace from right to left to avoid position shift
|
||
for matched_text, start, end in sorted(to_replace, key=lambda x: -x[1]):
|
||
normalized = " ".join(matched_text.split())
|
||
replacement = (
|
||
self.mapping.get(matched_text)
|
||
or self.mapping.get(normalized)
|
||
or next((v for k, v in self.mapping.items() if " ".join(k.split()) == normalized), None)
|
||
or next((v for k, v in self.mapping.items() if k.lower() == matched_text.lower()), None)
|
||
)
|
||
if not replacement:
|
||
replacement = f"[name.{uuid.uuid4()}]"
|
||
self.mapping[matched_text] = replacement
|
||
text = text[:start] + replacement + text[end:]
|
||
|
||
return text
|
||
|
||
def processString(self, text: str) -> str:
|
||
"""
|
||
Process a string by replacing patterns first, then custom names
|
||
|
||
Args:
|
||
text: Text to process
|
||
|
||
Returns:
|
||
str: Processed text with replacements
|
||
"""
|
||
if self._isPlaceholder(text):
|
||
return text
|
||
|
||
# Step 1: Replace pattern-based matches FIRST
|
||
text = self._replacePatternMatches(text)
|
||
|
||
# Step 2: Replace custom names SECOND
|
||
text = self._replaceCustomNames(text)
|
||
|
||
return text
|
||
|
||
def processJsonValue(self, value: Any) -> Any:
|
||
"""
|
||
Process a JSON value for anonymization
|
||
|
||
Args:
|
||
value: Value to process
|
||
|
||
Returns:
|
||
Any: Processed value
|
||
"""
|
||
if isinstance(value, str):
|
||
return self.processString(value)
|
||
elif isinstance(value, dict):
|
||
return {k: self.processJsonValue(v) for k, v in value.items()}
|
||
elif isinstance(value, list):
|
||
return [self.processJsonValue(item) for item in value]
|
||
else:
|
||
return value
|
||
|
||
def getMapping(self) -> Dict[str, str]:
|
||
"""
|
||
Get the current mapping of original values to placeholders
|
||
|
||
Returns:
|
||
Dict[str, str]: Mapping dictionary
|
||
"""
|
||
return self.mapping.copy()
|
||
|
||
def clearMapping(self):
|
||
"""Clear the current mapping"""
|
||
self.mapping.clear()
|