gateway/modules/features/neutralization/serviceNeutralization/subParseString.py

296 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
String parsing and replacement utilities for data anonymization
Handles pattern matching and replacement for emails, phones, addresses, IDs and names
"""
import re
import uuid
from typing import Dict, List, Tuple, Any
from .subPatterns import DataPatterns, findPatternsInText
# Phrases or words that must never be neutralized (labels, Anrede, etc.)
_NEUTRALIZATION_BLACKLIST = frozenset({
"Für Sie", "Ihre Ansprechperson", "AXA 24", "General Agent",
"Your Contact", "Contact Person", "Bei Fragen", "Mit Freundlichen",
"Frau", "Herr", # Anrede
"Reise", "Reisebeginn", "Reiseende", "Vertragsbeginn", "Zahlbar",
"Versicherte", "Versicherungsnehmer", "Versicherung", "Insurance",
"Leistungen", "Basis", "Benefits", # Section labels
"Start", "Beginn", "Ende", "End", "trip", # Contract labels (Start of trip, End of trip, etc.)
"incomplete", "Application", "Complete", "Pending", # Form/status labels, not addresses
"Marketing", "Verkaufsstrategien", "Qualitätsmanagement", # Business terms, not addresses
"Ausbildungsstätte", "Realschule", # Institution types, not city names
# Ambiguous substrings match in Zurich, CHF, UID-Nr., websites, etc.
"CH", "DE", "FR", "IT", "Nr", "Nr.", "Nr:", "No", "No.", "No:",
"www", ".ch", ".com", ".org", ".net", "CHF",
# Labels that must never be neutralized
"Kontakt", "Kanzlei", "Telefon", "Matrikel-Nr", "Matrikel-Nr.", "Student ID", "Student-ID",
})
class StringParser:
"""Handles string parsing and replacement operations"""
def __init__(self, NamesToParse: List[str] = None):
"""
Initialize the string parser
Args:
NamesToParse: List of names to parse and replace (case-insensitive)
"""
self.data_patterns = DataPatterns.patterns
self.NamesToParse = NamesToParse or []
self.mapping = {}
def _isPlaceholder(self, text: str) -> bool:
"""
Check if text is already a placeholder in format [tag.uuid]
Args:
text: Text to check
Returns:
bool: True if text is a placeholder
"""
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', text))
def _replacePatternMatches(self, text: str) -> str:
"""
Replace pattern-based matches (emails, phones, etc.) in text
Args:
text: Text to process
Returns:
str: Text with pattern matches replaced
"""
patternMatches = findPatternsInText(text, self.data_patterns)
# Exclude matches that are fully contained in a longer match (e.g. skip "2026" inside "17.02.2026")
def is_contained(m, all_matches):
for other in all_matches:
if other is m:
continue
if other[2] <= m[2] and m[3] <= other[3] and (other[3] - other[2]) > (m[3] - m[2]):
return True
return False
patternMatches = [m for m in patternMatches if not is_contained(m, patternMatches)]
# Deduplicate: keep one match per (start,end) same span can match multiple patterns
seen = set()
unique_matches = []
for m in patternMatches:
key = (m[2], m[3])
if key not in seen:
seen.add(key)
unique_matches.append(m)
patternMatches = unique_matches
# Exclude address matches that overlap with date matches (e.g. "2026 den" overlaps "17.02.2026")
def overlaps(a_start, a_end, b_start, b_end):
return a_start < b_end and b_start < a_end
date_ranges = [(m[2], m[3]) for m in patternMatches if m[0] == "date"]
patternMatches = [
m for m in patternMatches
if not (m[0] == "address" and any(overlaps(m[2], m[3], ds, de) for ds, de in date_ranges))
]
# For name matches: resolve overlaps keep only longest to avoid multiple placeholders for one name
# (e.g. "Ida", "Dittrich", "Ida Dittrich" → keep only "Ida Dittrich" with one UUID)
name_matches = [(m, m[3] - m[2]) for m in patternMatches if m[0] == "name"]
name_spans = [(m[2], m[3]) for m, _ in name_matches]
patternMatches = [
m for m in patternMatches
if m[0] != "name"
or not any(
overlaps(m[2], m[3], ns, ne) and (ne - ns) > (m[3] - m[2])
for (ns, ne) in name_spans
)
]
# Process from right to left to avoid position shifts
for patternName, matchedText, start, end in reversed(patternMatches):
# Skip if already a placeholder
if self._isPlaceholder(matchedText):
continue
# Skip if contains placeholder characters
if '[' in matchedText or ']' in matchedText:
continue
# Skip blacklisted text (labels, Anrede, etc.) never neutralize
if matchedText.strip() in _NEUTRALIZATION_BLACKLIST:
continue
# Skip if match contains any blacklisted word (e.g. "2026 Reise" or "2026 Reisebeginn" from address pattern)
if any(w in _NEUTRALIZATION_BLACKLIST for w in matchedText.split()):
continue
# Skip phone matches that are clearly part of a price (e.g. 128 in 128.56 CHF)
if patternName == "phone" and end + 3 <= len(text):
after = text[end : end + 3]
if (after[0] in ".," and len(after) >= 3 and after[1:3].isdigit()):
continue
if matchedText not in self.mapping:
# Generate a UUID for the placeholder
placeholderId = str(uuid.uuid4())
# Create placeholder in format [type.uuid]
typeMapping = {
'email': 'email',
'phone': 'phone',
'address': 'address',
'date': 'date',
'policy': 'policy',
'name': 'name',
'id': 'id',
'iban': 'iban',
'ssn': 'ssn',
}
placeholderType = typeMapping.get(patternName, 'data')
self.mapping[matchedText] = f"[{placeholderType}.{placeholderId}]"
replacement = self.mapping[matchedText]
text = text[:start] + replacement + text[end:]
return text
def _replaceCustomNames(self, text: str) -> str:
"""
Replace custom names from the user list in text.
Builds composite names (e.g. "Ida Dittrich") so full names get one UUID, not one per word.
"""
names = [n.strip() for n in self.NamesToParse if n.strip()]
if not names:
return text
# Add composite names: "Ida Dittrich", "Dittrich Ida" when both are in list
expanded = set(names)
for i, n1 in enumerate(names):
for n2 in names:
if n1 != n2:
expanded.add(f"{n1} {n2}")
expanded.add(f"{n2} {n1}")
# One UUID per person: composites and their parts share same UUID
# Also align with DataPatterns mapping (step 1 may have already replaced "Ida Dittrich")
name_to_uuid: Dict[str, str] = {}
for composite in sorted(expanded, key=len, reverse=True):
if " " not in composite:
continue
parts = composite.split()
parts_set = frozenset(parts)
existing_uuid = next((name_to_uuid[p] for p in parts_set if p in name_to_uuid), None)
if existing_uuid is None:
existing_uuid = next(
(self.mapping[k] for k in (composite, *parts_set) if k in self.mapping),
None
)
if existing_uuid is None:
existing_uuid = f"[name.{uuid.uuid4()}]"
for p in parts_set:
name_to_uuid[p] = existing_uuid
name_to_uuid[composite] = existing_uuid
if len(parts) == 2:
name_to_uuid[f"{parts[1]} {parts[0]}"] = existing_uuid
for n in names:
if n not in name_to_uuid:
name_to_uuid[n] = self.mapping.get(n) or f"[name.{uuid.uuid4()}]"
self.mapping.update({k: v for k, v in name_to_uuid.items() if k not in self.mapping})
# Collect ALL matches from all name patterns, then keep only longest per span to avoid
# triple replacement ("Ida" + "Dittrich" + "Ida Dittrich" -> only "Ida Dittrich")
all_matches: List[Tuple[str, int, int]] = []
for name in sorted(expanded, key=len, reverse=True):
if " " in name:
parts = name.split()
pattern_str = r"\b" + r"\s+".join(re.escape(p) for p in parts) + r"\b"
else:
pattern_str = r"\b" + re.escape(name) + r"\b"
pattern = re.compile(pattern_str, re.IGNORECASE)
for m in pattern.finditer(text):
all_matches.append((m.group(), m.start(), m.end()))
# Remove matches that overlap with a longer match (keep longest per span)
def _overlaps(s1, e1, s2, e2):
return s1 < e2 and s2 < e1
def _contained_in_longer(matched_text: str, start: int, end: int) -> bool:
for other_text, os, oe in all_matches:
if (os, oe) == (start, end):
continue
if _overlaps(start, end, os, oe) and (oe - os) > (end - start):
return True
return False
to_replace = [(t, s, e) for t, s, e in all_matches if not _contained_in_longer(t, s, e)]
to_replace = list({(s, e): (t, s, e) for t, s, e in to_replace}.values())
# Replace from right to left to avoid position shift
for matched_text, start, end in sorted(to_replace, key=lambda x: -x[1]):
normalized = " ".join(matched_text.split())
replacement = (
self.mapping.get(matched_text)
or self.mapping.get(normalized)
or next((v for k, v in self.mapping.items() if " ".join(k.split()) == normalized), None)
or next((v for k, v in self.mapping.items() if k.lower() == matched_text.lower()), None)
)
if not replacement:
replacement = f"[name.{uuid.uuid4()}]"
self.mapping[matched_text] = replacement
text = text[:start] + replacement + text[end:]
return text
def processString(self, text: str) -> str:
"""
Process a string by replacing patterns first, then custom names
Args:
text: Text to process
Returns:
str: Processed text with replacements
"""
if self._isPlaceholder(text):
return text
# Step 1: Replace pattern-based matches FIRST
text = self._replacePatternMatches(text)
# Step 2: Replace custom names SECOND
text = self._replaceCustomNames(text)
return text
def processJsonValue(self, value: Any) -> Any:
"""
Process a JSON value for anonymization
Args:
value: Value to process
Returns:
Any: Processed value
"""
if isinstance(value, str):
return self.processString(value)
elif isinstance(value, dict):
return {k: self.processJsonValue(v) for k, v in value.items()}
elif isinstance(value, list):
return [self.processJsonValue(item) for item in value]
else:
return value
def getMapping(self) -> Dict[str, str]:
"""
Get the current mapping of original values to placeholders
Returns:
Dict[str, str]: Mapping dictionary
"""
return self.mapping.copy()
def clearMapping(self):
"""Clear the current mapping"""
self.mapping.clear()