# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Common processing utilities for data anonymization Shared functions and data structures """ import re from typing import Dict, List, Any, Union, Optional from pydantic import BaseModel from dataclasses import dataclass @dataclass class ProcessResult: """Result of content processing""" data: Any mapping: Dict[str, str] replaced_fields: List[str] processed_info: Dict[str, Any] # Additional processing information class NeutralizationAttribute(BaseModel): """Single attribute describing a replacement mapping.""" original: str placeholder: str patternType: Optional[str] = None class NeutralizationResult(BaseModel): """Unified result for all content types, suitable for API responses.""" neutralized_text: str mapping: Dict[str, str] attributes: List[NeutralizationAttribute] processed_info: Dict[str, Any] class CommonUtils: """Common utility functions for data processing""" @staticmethod def normalizeWhitespace(text: str) -> str: """ Normalize whitespace in text Args: text: Text to normalize Returns: str: Normalized text """ text = re.sub(r'\s+', ' ', text) text = text.replace('\r\n', '\n').replace('\r', '\n') return text.strip() @staticmethod def _isTableLine(line: str) -> bool: """ Check if a line represents a table row Args: line: Line to check Returns: bool: True if line is a table row """ return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or re.match(r'^\s*[^\t]+\t[^\t]+$', line)) @staticmethod def detectContentType(content: str) -> str: """ Detect the type of content based on its structure Args: content: Content to analyze Returns: str: Content type ('csv', 'json', 'xml', 'text', 'binary') """ content = content.strip() # Check for JSON if content.startswith('{') and content.endswith('}'): return 'json' if content.startswith('[') and content.endswith(']'): return 'json' # Check for XML if content.startswith('<') and content.endswith('>'): return 'xml' # Check for CSV (has commas and newlines) if ',' in content and '\n' in content: lines = content.split('\n') if len(lines) > 1 and all(',' in line for line in lines[:3]): return 'csv' # Check for binary if len(content) > 100 and '\x00' in content: return 'binary' # Default to text return 'text' @staticmethod def mergeMappings(*mappings: Dict[str, str]) -> Dict[str, str]: """ Merge multiple mapping dictionaries Args: *mappings: Mapping dictionaries to merge Returns: Dict[str, str]: Merged mapping dictionary """ merged = {} for mapping in mappings: merged.update(mapping) return merged @staticmethod def createPlaceholder(placeholderType: str, placeholderId: str) -> str: """ Create a placeholder string in the format [type.uuid] Args: placeholderType: Type of placeholder (email, phone, name, etc.) placeholderId: Unique identifier for the placeholder Returns: str: Formatted placeholder string """ return f"[{placeholderType}.{placeholderId}]" @staticmethod def validatePlaceholder(placeholder: str) -> bool: """ Validate if a string is a valid placeholder Args: placeholder: String to validate Returns: bool: True if valid placeholder """ return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', placeholder)) @staticmethod def extractPlaceholderInfo(placeholder: str) -> Optional[tuple]: """ Extract type and ID from a placeholder Args: placeholder: Placeholder string Returns: Optional[tuple]: (type, id) or None if invalid """ match = re.match(r'^\[([a-z]+)\.([a-f0-9-]+)\]$', placeholder) if match: return match.group(1), match.group(2) return None