159 lines
4.5 KiB
Python
159 lines
4.5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Common processing utilities for data anonymization
|
|
Shared functions and data structures
|
|
"""
|
|
|
|
import re
|
|
from typing import Dict, List, Any, Union, Optional
|
|
from pydantic import BaseModel
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class ProcessResult:
|
|
"""Result of content processing"""
|
|
data: Any
|
|
mapping: Dict[str, str]
|
|
replaced_fields: List[str]
|
|
processed_info: Dict[str, Any] # Additional processing information
|
|
|
|
class NeutralizationAttribute(BaseModel):
|
|
"""Single attribute describing a replacement mapping."""
|
|
original: str
|
|
placeholder: str
|
|
patternType: Optional[str] = None
|
|
|
|
class NeutralizationResult(BaseModel):
|
|
"""Unified result for all content types, suitable for API responses."""
|
|
neutralized_text: str
|
|
mapping: Dict[str, str]
|
|
attributes: List[NeutralizationAttribute]
|
|
processed_info: Dict[str, Any]
|
|
|
|
class CommonUtils:
|
|
"""Common utility functions for data processing"""
|
|
|
|
@staticmethod
|
|
def normalizeWhitespace(text: str) -> str:
|
|
"""
|
|
Normalize whitespace in text
|
|
|
|
Args:
|
|
text: Text to normalize
|
|
|
|
Returns:
|
|
str: Normalized text
|
|
"""
|
|
text = re.sub(r'\s+', ' ', text)
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
return text.strip()
|
|
|
|
@staticmethod
|
|
def _isTableLine(line: str) -> bool:
|
|
"""
|
|
Check if a line represents a table row
|
|
|
|
Args:
|
|
line: Line to check
|
|
|
|
Returns:
|
|
bool: True if line is a table row
|
|
"""
|
|
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
|
|
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
|
|
|
|
@staticmethod
|
|
def detectContentType(content: str) -> str:
|
|
"""
|
|
Detect the type of content based on its structure
|
|
|
|
Args:
|
|
content: Content to analyze
|
|
|
|
Returns:
|
|
str: Content type ('csv', 'json', 'xml', 'text', 'binary')
|
|
"""
|
|
content = content.strip()
|
|
|
|
# Check for JSON
|
|
if content.startswith('{') and content.endswith('}'):
|
|
return 'json'
|
|
if content.startswith('[') and content.endswith(']'):
|
|
return 'json'
|
|
|
|
# Check for XML
|
|
if content.startswith('<') and content.endswith('>'):
|
|
return 'xml'
|
|
|
|
# Check for CSV (has commas and newlines)
|
|
if ',' in content and '\n' in content:
|
|
lines = content.split('\n')
|
|
if len(lines) > 1 and all(',' in line for line in lines[:3]):
|
|
return 'csv'
|
|
|
|
# Check for binary
|
|
if len(content) > 100 and '\x00' in content:
|
|
return 'binary'
|
|
|
|
# Default to text
|
|
return 'text'
|
|
|
|
@staticmethod
|
|
def mergeMappings(*mappings: Dict[str, str]) -> Dict[str, str]:
|
|
"""
|
|
Merge multiple mapping dictionaries
|
|
|
|
Args:
|
|
*mappings: Mapping dictionaries to merge
|
|
|
|
Returns:
|
|
Dict[str, str]: Merged mapping dictionary
|
|
"""
|
|
merged = {}
|
|
for mapping in mappings:
|
|
merged.update(mapping)
|
|
return merged
|
|
|
|
@staticmethod
|
|
def createPlaceholder(placeholderType: str, placeholderId: str) -> str:
|
|
"""
|
|
Create a placeholder string in the format [type.uuid]
|
|
|
|
Args:
|
|
placeholderType: Type of placeholder (email, phone, name, etc.)
|
|
placeholderId: Unique identifier for the placeholder
|
|
|
|
Returns:
|
|
str: Formatted placeholder string
|
|
"""
|
|
return f"[{placeholderType}.{placeholderId}]"
|
|
|
|
@staticmethod
|
|
def validatePlaceholder(placeholder: str) -> bool:
|
|
"""
|
|
Validate if a string is a valid placeholder
|
|
|
|
Args:
|
|
placeholder: String to validate
|
|
|
|
Returns:
|
|
bool: True if valid placeholder
|
|
"""
|
|
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', placeholder))
|
|
|
|
@staticmethod
|
|
def extractPlaceholderInfo(placeholder: str) -> Optional[tuple]:
|
|
"""
|
|
Extract type and ID from a placeholder
|
|
|
|
Args:
|
|
placeholder: Placeholder string
|
|
|
|
Returns:
|
|
Optional[tuple]: (type, id) or None if invalid
|
|
"""
|
|
match = re.match(r'^\[([a-z]+)\.([a-f0-9-]+)\]$', placeholder)
|
|
if match:
|
|
return match.group(1), match.group(2)
|
|
return None
|