162 lines
5.4 KiB
Python
162 lines
5.4 KiB
Python
"""
|
|
String parsing and replacement utilities for data anonymization
|
|
Handles pattern matching and replacement for emails, phones, addresses, IDs and names
|
|
"""
|
|
|
|
import re
|
|
import uuid
|
|
from typing import Dict, List, Tuple, Any
|
|
from modules.neutralizer.subPatterns import DataPatterns, find_patterns_in_text
|
|
|
|
class StringParser:
|
|
"""Handles string parsing and replacement operations"""
|
|
|
|
def __init__(self, names_to_parse: List[str] = None):
|
|
"""
|
|
Initialize the string parser
|
|
|
|
Args:
|
|
names_to_parse: List of names to parse and replace (case-insensitive)
|
|
"""
|
|
self.data_patterns = DataPatterns.patterns
|
|
self.names_to_parse = names_to_parse or []
|
|
self.mapping = {}
|
|
|
|
def is_placeholder(self, text: str) -> bool:
|
|
"""
|
|
Check if text is already a placeholder in format [tag.uuid]
|
|
|
|
Args:
|
|
text: Text to check
|
|
|
|
Returns:
|
|
bool: True if text is a placeholder
|
|
"""
|
|
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', text))
|
|
|
|
def replace_pattern_matches(self, text: str) -> str:
|
|
"""
|
|
Replace pattern-based matches (emails, phones, etc.) in text
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Text with pattern matches replaced
|
|
"""
|
|
pattern_matches = find_patterns_in_text(text, self.data_patterns)
|
|
|
|
# Process pattern matches from right to left to avoid position shifts
|
|
for pattern_name, matched_text, start, end in reversed(pattern_matches):
|
|
# Skip if already a placeholder
|
|
if self.is_placeholder(matched_text):
|
|
continue
|
|
|
|
# Skip if contains placeholder characters
|
|
if '[' in matched_text or ']' in matched_text:
|
|
continue
|
|
|
|
if matched_text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
placeholder_id = str(uuid.uuid4())
|
|
# Create placeholder in format [type.uuid]
|
|
type_mapping = {
|
|
'email': 'email',
|
|
'phone': 'phone',
|
|
'address': 'address',
|
|
'id': 'id'
|
|
}
|
|
placeholder_type = type_mapping.get(pattern_name, 'data')
|
|
self.mapping[matched_text] = f"[{placeholder_type}.{placeholder_id}]"
|
|
|
|
replacement = self.mapping[matched_text]
|
|
text = text[:start] + replacement + text[end:]
|
|
|
|
return text
|
|
|
|
def replace_custom_names(self, text: str) -> str:
|
|
"""
|
|
Replace custom names from the user list in text
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Text with custom names replaced
|
|
"""
|
|
for name in self.names_to_parse:
|
|
if not name.strip():
|
|
continue
|
|
|
|
# Create case-insensitive regex pattern with word boundaries
|
|
pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
|
|
|
|
# Find all matches for this name
|
|
matches = list(pattern.finditer(text))
|
|
|
|
# Replace each match with a placeholder
|
|
for match in reversed(matches): # Process from right to left to avoid position shifts
|
|
matched_text = match.group()
|
|
if matched_text not in self.mapping:
|
|
# Generate a UUID for the placeholder
|
|
placeholder_id = str(uuid.uuid4())
|
|
self.mapping[matched_text] = f"[name.{placeholder_id}]"
|
|
|
|
replacement = self.mapping[matched_text]
|
|
start, end = match.span()
|
|
text = text[:start] + replacement + text[end:]
|
|
|
|
return text
|
|
|
|
def process_string(self, text: str) -> str:
|
|
"""
|
|
Process a string by replacing patterns first, then custom names
|
|
|
|
Args:
|
|
text: Text to process
|
|
|
|
Returns:
|
|
str: Processed text with replacements
|
|
"""
|
|
if self.is_placeholder(text):
|
|
return text
|
|
|
|
# Step 1: Replace pattern-based matches FIRST
|
|
text = self.replace_pattern_matches(text)
|
|
|
|
# Step 2: Replace custom names SECOND
|
|
text = self.replace_custom_names(text)
|
|
|
|
return text
|
|
|
|
def process_json_value(self, value: Any) -> Any:
|
|
"""
|
|
Process a JSON value for anonymization
|
|
|
|
Args:
|
|
value: Value to process
|
|
|
|
Returns:
|
|
Any: Processed value
|
|
"""
|
|
if isinstance(value, str):
|
|
return self.process_string(value)
|
|
elif isinstance(value, dict):
|
|
return {k: self.process_json_value(v) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [self.process_json_value(item) for item in value]
|
|
else:
|
|
return value
|
|
|
|
def get_mapping(self) -> Dict[str, str]:
|
|
"""
|
|
Get the current mapping of original values to placeholders
|
|
|
|
Returns:
|
|
Dict[str, str]: Mapping dictionary
|
|
"""
|
|
return self.mapping.copy()
|
|
|
|
def clear_mapping(self):
|
|
"""Clear the current mapping"""
|
|
self.mapping.clear()
|