gateway/modules/features/neutralizer/serviceNeutralization/subProcessCommon.py
2026-01-22 17:00:29 +01:00

159 lines
4.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Common processing utilities for data anonymization
Shared functions and data structures
"""
import re
from typing import Dict, List, Any, Union, Optional
from pydantic import BaseModel
from dataclasses import dataclass
@dataclass
class ProcessResult:
"""Result of content processing"""
data: Any
mapping: Dict[str, str]
replaced_fields: List[str]
processed_info: Dict[str, Any] # Additional processing information
class NeutralizationAttribute(BaseModel):
"""Single attribute describing a replacement mapping."""
original: str
placeholder: str
patternType: Optional[str] = None
class NeutralizationResult(BaseModel):
"""Unified result for all content types, suitable for API responses."""
neutralized_text: str
mapping: Dict[str, str]
attributes: List[NeutralizationAttribute]
processed_info: Dict[str, Any]
class CommonUtils:
"""Common utility functions for data processing"""
@staticmethod
def normalizeWhitespace(text: str) -> str:
"""
Normalize whitespace in text
Args:
text: Text to normalize
Returns:
str: Normalized text
"""
text = re.sub(r'\s+', ' ', text)
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
@staticmethod
def _isTableLine(line: str) -> bool:
"""
Check if a line represents a table row
Args:
line: Line to check
Returns:
bool: True if line is a table row
"""
return bool(re.match(r'^\s*[^:]+:\s*[^:]+$', line) or
re.match(r'^\s*[^\t]+\t[^\t]+$', line))
@staticmethod
def detectContentType(content: str) -> str:
"""
Detect the type of content based on its structure
Args:
content: Content to analyze
Returns:
str: Content type ('csv', 'json', 'xml', 'text', 'binary')
"""
content = content.strip()
# Check for JSON
if content.startswith('{') and content.endswith('}'):
return 'json'
if content.startswith('[') and content.endswith(']'):
return 'json'
# Check for XML
if content.startswith('<') and content.endswith('>'):
return 'xml'
# Check for CSV (has commas and newlines)
if ',' in content and '\n' in content:
lines = content.split('\n')
if len(lines) > 1 and all(',' in line for line in lines[:3]):
return 'csv'
# Check for binary
if len(content) > 100 and '\x00' in content:
return 'binary'
# Default to text
return 'text'
@staticmethod
def mergeMappings(*mappings: Dict[str, str]) -> Dict[str, str]:
"""
Merge multiple mapping dictionaries
Args:
*mappings: Mapping dictionaries to merge
Returns:
Dict[str, str]: Merged mapping dictionary
"""
merged = {}
for mapping in mappings:
merged.update(mapping)
return merged
@staticmethod
def createPlaceholder(placeholderType: str, placeholderId: str) -> str:
"""
Create a placeholder string in the format [type.uuid]
Args:
placeholderType: Type of placeholder (email, phone, name, etc.)
placeholderId: Unique identifier for the placeholder
Returns:
str: Formatted placeholder string
"""
return f"[{placeholderType}.{placeholderId}]"
@staticmethod
def validatePlaceholder(placeholder: str) -> bool:
"""
Validate if a string is a valid placeholder
Args:
placeholder: String to validate
Returns:
bool: True if valid placeholder
"""
return bool(re.match(r'^\[[a-z]+\.[a-f0-9-]+\]$', placeholder))
@staticmethod
def extractPlaceholderInfo(placeholder: str) -> Optional[tuple]:
"""
Extract type and ID from a placeholder
Args:
placeholder: Placeholder string
Returns:
Optional[tuple]: (type, id) or None if invalid
"""
match = re.match(r'^\[([a-z]+)\.([a-f0-9-]+)\]$', placeholder)
if match:
return match.group(1), match.group(2)
return None