gateway/modules/neutralizer/subProcessCommon.py
2025-09-22 00:39:15 +02:00

143 lines
4 KiB
Python

"""
Common processing utilities for data anonymization
Shared functions and data structures
"""
import re
from typing import Dict, List, Any, Union, Optional
from dataclasses import dataclass
@dataclass
class ProcessResult:
"""Result of content processing"""
data: Any
mapping: Dict[str, str]
replaced_fields: List[str]
processed_info: Dict[str, Any] # Additional processing information
class CommonUtils:
"""Common utility functions for data processing"""
@staticmethod
def normalize_whitespace(text: str) -> str:
"""
Normalize whitespace in text
Args:
text: Text to normalize
Returns:
str: Normalized text
"""
text = re.sub(r'\s+', ' ', text)
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
@staticmethod
def is_table_line(line: str) -> bool:
"""
Check if a line represents a table row
Args:
line: Line to check
Returns:
bool: True if line is a table row
"""
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
@staticmethod
def detect_content_type(content: str) -> str:
"""
Detect the type of content based on its structure
Args:
content: Content to analyze
Returns:
str: Content type ('csv', 'json', 'xml', 'text', 'binary')
"""
content = content.strip()
# Check for JSON
if content.startswith('{') and content.endswith('}'):
return 'json'
if content.startswith('[') and content.endswith(']'):
return 'json'
# Check for XML
if content.startswith('<') and content.endswith('>'):
return 'xml'
# Check for CSV (has commas and newlines)
if ',' in content and '\n' in content:
lines = content.split('\n')
if len(lines) > 1 and all(',' in line for line in lines[:3]):
return 'csv'
# Check for binary
if len(content) > 100 and '\x00' in content:
return 'binary'
# Default to text
return 'text'
@staticmethod
def merge_mappings(*mappings: Dict[str, str]) -> Dict[str, str]:
"""
Merge multiple mapping dictionaries
Args:
*mappings: Mapping dictionaries to merge
Returns:
Dict[str, str]: Merged mapping dictionary
"""
merged = {}
for mapping in mappings:
merged.update(mapping)
return merged
@staticmethod
def create_placeholder(placeholder_type: str, placeholder_id: str) -> str:
"""
Create a placeholder string in the format [type.uuid]
Args:
placeholder_type: Type of placeholder (email, phone, name, etc.)
placeholder_id: Unique identifier for the placeholder
Returns:
str: Formatted placeholder string
"""
return f"[{placeholder_type}.{placeholder_id}]"
@staticmethod
def validate_placeholder(placeholder: str) -> bool:
"""
Validate if a string is a valid placeholder
Args:
placeholder: String to validate
Returns:
bool: True if valid placeholder
"""
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', placeholder))
@staticmethod
def extract_placeholder_info(placeholder: str) -> Optional[tuple]:
"""
Extract type and ID from a placeholder
Args:
placeholder: Placeholder string
Returns:
Optional[tuple]: (type, id) or None if invalid
"""
match = re.match(r'^\[([a-z]+)\.([a-f0-9-]+)\]$', placeholder)
if match:
return match.group(1), match.group(2)
return None