164 lines
5.4 KiB
Python
164 lines
5.4 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
String parsing and replacement utilities for data anonymization
|
|
Handles pattern matching and replacement for emails, phones, addresses, IDs and names
|
|
"""
|
|
|
|
import re
|
|
import uuid
|
|
from typing import Dict, List, Tuple, Any
|
|
from modules.services.serviceNeutralization.subPatterns import DataPatterns, findPatternsInText
|
|
|
|
class StringParser:
|
|
"""Handles string parsing and replacement operations"""
|
|
|
|
def __init__(self, NamesToParse: List[str] = None):
|
|
"""
|
|
Initialize the string parser
|
|
|
|
Args:
|
|
NamesToParse: List of names to parse and replace (case-insensitive)
|
|
"""
|
|
self.data_patterns = DataPatterns.patterns
|
|
self.NamesToParse = NamesToParse or []
|
|
self.mapping = {}
|
|
|
|
def _isPlaceholder(self, text: str) -> bool:
|
|
"""
|
|
Check if text is already a placeholder in format [tag.uuid]
|
|
|
|
Args:
|
|
text: Text to check
|
|
|
|
Returns:
|
|
bool: True if text is a placeholder
|
|
"""
|
|
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', text))
|
|
|
|
def _replacePatternMatches(self, text: str) -> str:
|
|
"""
|
|
Replace pattern-based matches (emails, phones, etc.) in text
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Text with pattern matches replaced
|
|
"""
|
|
patternMatches = findPatternsInText(text, self.data_patterns)
|
|
|
|
# Process pattern matches from right to left to avoid position shifts
|
|
for patternName, matchedText, start, end in reversed(patternMatches):
|
|
# Skip if already a placeholder
|
|
if self._isPlaceholder(matchedText):
|
|
continue
|
|
|
|
# Skip if contains placeholder characters
|
|
if '[' in matchedText or ']' in matchedText:
|
|
continue
|
|
|
|
if matchedText not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
placeholderId = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
typeMapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholderType = typeMapping.get(patternName, 'data')
|
|
self.mapping[matchedText] = f"[{placeholderType}.{placeholderId}]"
|
|
|
|
replacement = self.mapping[matchedText]
|
|
text = text[:start] + replacement + text[end:]
|
|
|
|
return text
|
|
|
|
def _replaceCustomNames(self, text: str) -> str:
|
|
"""
|
|
Replace custom names from the user list in text
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Text with custom names replaced
|
|
"""
|
|
for name in self.NamesToParse:
|
|
if not name.strip():
|
|
continue
|
|
|
|
# Create case-insensitive regex pattern with word boundaries
|
|
pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
|
|
|
|
# Find all matches for this name
|
|
matches = list(pattern.finditer(text))
|
|
|
|
# Replace each match with a placeholder
|
|
for match in reversed(matches): # Process from right to left to avoid position shifts
|
|
matchedText = match.group()
|
|
if matchedText not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
placeholderId = str(uuid.uuid4())
|
|
self.mapping[matchedText] = f"[name.{placeholderId}]"
|
|
|
|
replacement = self.mapping[matchedText]
|
|
start, end = match.span()
|
|
text = text[:start] + replacement + text[end:]
|
|
|
|
return text
|
|
|
|
def processString(self, text: str) -> str:
|
|
"""
|
|
Process a string by replacing patterns first, then custom names
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Processed text with replacements
|
|
"""
|
|
if self._isPlaceholder(text):
|
|
return text
|
|
|
|
# Step 1: Replace pattern-based matches FIRST
|
|
text = self._replacePatternMatches(text)
|
|
|
|
# Step 2: Replace custom names SECOND
|
|
text = self._replaceCustomNames(text)
|
|
|
|
return text
|
|
|
|
def processJsonValue(self, value: Any) -> Any:
|
|
"""
|
|
Process a JSON value for anonymization
|
|
|
|
Args:
|
|
value: Value to process
|
|
|
|
Returns:
|
|
Any: Processed value
|
|
"""
|
|
if isinstance(value, str):
|
|
return self.processString(value)
|
|
elif isinstance(value, dict):
|
|
return {k: self.processJsonValue(v) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [self.processJsonValue(item) for item in value]
|
|
else:
|
|
return value
|
|
|
|
def getMapping(self) -> Dict[str, str]:
|
|
"""
|
|
Get the current mapping of original values to placeholders
|
|
|
|
Returns:
|
|
Dict[str, str]: Mapping dictionary
|
|
"""
|
|
return self.mapping.copy()
|
|
|
|
def clearMapping(self):
|
|
"""Clear the current mapping"""
|
|
self.mapping.clear()
|