diff --git a/modules/interfaces/interfaceAppAccess.py b/modules/interfaces/interfaceAppAccess.py index 25b318ad..d91dddc1 100644 --- a/modules/interfaces/interfaceAppAccess.py +++ b/modules/interfaces/interfaceAppAccess.py @@ -74,6 +74,28 @@ class AppAccess: else: # Regular users only see their own connections filtered_records = [r for r in recordset if r.get("userId") == self.userId] + # Special handling for data neutralization config table + elif table_name == "DataNeutraliserConfig": + if self.privilege == UserPrivilege.SYSADMIN: + # SysAdmin sees all configs + filtered_records = recordset + elif self.privilege == UserPrivilege.ADMIN: + # Admin sees configs in their mandate + filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] + else: + # Regular users only see their own configs + filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId] + # Special handling for data neutralizer attributes table + elif table_name == "DataNeutralizerAttributes": + if self.privilege == UserPrivilege.SYSADMIN: + # SysAdmin sees all attributes + filtered_records = recordset + elif self.privilege == UserPrivilege.ADMIN: + # Admin sees attributes in their mandate + filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] + else: + # Regular users only see their own attributes + filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId] # System admins see all other records elif self.privilege == UserPrivilege.SYSADMIN: filtered_records = recordset @@ -126,6 +148,37 @@ class AppAccess: record["_hideEdit"] = record.get("userId") != self.userId record["_hideDelete"] = record.get("userId") != self.userId + elif table_name == "DataNeutraliserConfig": + # Everyone can view configs they have access to + record["_hideView"] = False + # SysAdmin can edit/delete any config + if self.privilege == UserPrivilege.SYSADMIN: + record["_hideEdit"] = False + record["_hideDelete"] = False + # Admin can edit/delete configs in their mandate + elif self.privilege == UserPrivilege.ADMIN: + record["_hideEdit"] = record.get("mandateId","-") != self.mandateId + record["_hideDelete"] = record.get("mandateId","-") != self.mandateId + # Regular users can only edit/delete their own configs + else: + record["_hideEdit"] = record.get("userId") != self.userId + record["_hideDelete"] = record.get("userId") != self.userId + elif table_name == "DataNeutralizerAttributes": + # Everyone can view attributes they have access to + record["_hideView"] = False + # SysAdmin can edit/delete any attributes + if self.privilege == UserPrivilege.SYSADMIN: + record["_hideEdit"] = False + record["_hideDelete"] = False + # Admin can edit/delete attributes in their mandate + elif self.privilege == UserPrivilege.ADMIN: + record["_hideEdit"] = record.get("mandateId","-") != self.mandateId + record["_hideDelete"] = record.get("mandateId","-") != self.mandateId + # Regular users can only edit/delete their own attributes + else: + record["_hideEdit"] = record.get("userId") != self.userId + record["_hideDelete"] = record.get("userId") != self.userId + elif table_name == "AuthEvent": # Only show auth events for the current user or if admin if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]: diff --git a/modules/interfaces/interfaceAppModel.py b/modules/interfaces/interfaceAppModel.py index 73d8d146..15e2fbcb 100644 --- a/modules/interfaces/interfaceAppModel.py +++ b/modules/interfaces/interfaceAppModel.py @@ -427,6 +427,126 @@ register_model_labels( } ) +class DataNeutraliserConfig(BaseModel, ModelMixin): + """Data model for data neutralization configuration""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Unique ID of the configuration", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + mandateId: str = Field( + description="ID of the mandate this configuration belongs to", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + userId: str = Field( + description="ID of the user who created this configuration", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + enabled: bool = Field( + default=True, + description="Whether data neutralization is enabled", + frontend_type="checkbox", + frontend_readonly=False, + frontend_required=False + ) + namesToParse: str = Field( + default="", + description="Multiline list of names to parse for neutralization", + frontend_type="textarea", + frontend_readonly=False, + frontend_required=False + ) + sharepointSourcePath: str = Field( + default="", + description="SharePoint path to read files for neutralization", + frontend_type="text", + frontend_readonly=False, + frontend_required=False + ) + sharepointTargetPath: str = Field( + default="", + description="SharePoint path to store neutralized files", + frontend_type="text", + frontend_readonly=False, + frontend_required=False + ) + +# Register labels for DataNeutraliserConfig +register_model_labels( + "DataNeutraliserConfig", + {"en": "Data Neutralization Config", "fr": "Configuration de neutralisation des données"}, + { + "id": {"en": "ID", "fr": "ID"}, + "mandateId": {"en": "Mandate ID", "fr": "ID de mandat"}, + "userId": {"en": "User ID", "fr": "ID utilisateur"}, + "enabled": {"en": "Enabled", "fr": "Activé"}, + "namesToParse": {"en": "Names to Parse", "fr": "Noms à analyser"}, + "sharepointSourcePath": {"en": "Source Path", "fr": "Chemin source"}, + "sharepointTargetPath": {"en": "Target Path", "fr": "Chemin cible"} + } +) + +class DataNeutralizerAttributes(BaseModel, ModelMixin): + """Data model for neutralized data attributes mapping""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), + description="Unique ID of the attribute mapping (used as UID in neutralized files)", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + mandateId: str = Field( + description="ID of the mandate this attribute belongs to", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + userId: str = Field( + description="ID of the user who created this attribute", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + originalText: str = Field( + description="Original text that was neutralized", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + fileId: Optional[str] = Field( + default=None, + description="ID of the file this attribute belongs to", + frontend_type="text", + frontend_readonly=True, + frontend_required=False + ) + patternType: str = Field( + description="Type of pattern that matched (email, phone, name, etc.)", + frontend_type="text", + frontend_readonly=True, + frontend_required=True + ) + +# Register labels for DataNeutralizerAttributes +register_model_labels( + "DataNeutralizerAttributes", + {"en": "Neutralized Data Attribute", "fr": "Attribut de données neutralisées"}, + { + "id": {"en": "ID", "fr": "ID"}, + "mandateId": {"en": "Mandate ID", "fr": "ID de mandat"}, + "userId": {"en": "User ID", "fr": "ID utilisateur"}, + "originalText": {"en": "Original Text", "fr": "Texte original"}, + "fileId": {"en": "File ID", "fr": "ID de fichier"}, + "patternType": {"en": "Pattern Type", "fr": "Type de modèle"} + } +) + class SystemTable(BaseModel, ModelMixin): """Data model for system table entries""" table_name: str = Field( diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py index c71e0c03..bc786fe1 100644 --- a/modules/interfaces/interfaceAppObjects.py +++ b/modules/interfaces/interfaceAppObjects.py @@ -11,6 +11,7 @@ import importlib import json from passlib.context import CryptContext import uuid +import re from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG @@ -19,7 +20,8 @@ from modules.interfaces.interfaceAppAccess import AppAccess from modules.interfaces.interfaceAppModel import ( User, Mandate, UserInDB, UserConnection, AuthAuthority, UserPrivilege, - ConnectionStatus, Token, AuthEvent + ConnectionStatus, Token, AuthEvent, + DataNeutraliserConfig, DataNeutralizerAttributes ) logger = logging.getLogger(__name__) @@ -993,6 +995,211 @@ class AppObjects: logger.error(f"Error during logout: {str(e)}") raise + # Data Neutralization methods + + def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]: + """Get the data neutralization configuration for the current user's mandate""" + try: + configs = self.db.getRecordset(DataNeutraliserConfig, recordFilter={"mandateId": self.mandateId}) + if not configs: + return None + + # Apply access control + filtered_configs = self._uam(DataNeutraliserConfig, configs) + if not filtered_configs: + return None + + return DataNeutraliserConfig.from_dict(filtered_configs[0]) + + except Exception as e: + logger.error(f"Error getting neutralization config: {str(e)}") + return None + + def createOrUpdateNeutralizationConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig: + """Create or update the data neutralization configuration""" + try: + # Check if config already exists + existing_config = self.getNeutralizationConfig() + + if existing_config: + # Update existing config + update_data = existing_config.to_dict() + update_data.update(config_data) + update_data["updatedAt"] = get_utc_timestamp() + + updated_config = DataNeutraliserConfig.from_dict(update_data) + self.db.recordModify(DataNeutraliserConfig, existing_config.id, updated_config) + + return updated_config + else: + # Create new config + config_data["mandateId"] = self.mandateId + config_data["userId"] = self.userId + + new_config = DataNeutraliserConfig.from_dict(config_data) + created_record = self.db.recordCreate(DataNeutraliserConfig, new_config) + + return DataNeutraliserConfig.from_dict(created_record) + + except Exception as e: + logger.error(f"Error creating/updating neutralization config: {str(e)}") + raise ValueError(f"Failed to create/update neutralization config: {str(e)}") + + def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]: + """Neutralize text content and store attribute mappings""" + try: + from modules.neutralizer.neutralizer import DataAnonymizer + + # Get neutralization configuration to extract namesToParse + config = self.getNeutralizationConfig() + names_to_parse = [] + if config and hasattr(config, 'namesToParse') and config.namesToParse: + # Split by newlines and filter out empty strings + names_to_parse = [name.strip() for name in config.namesToParse.split('\n') if name.strip()] + + # Initialize anonymizer with custom names + anonymizer = DataAnonymizer(names_to_parse=names_to_parse) + + # Process the text + result = anonymizer.process_content(text, 'text') + + # Store attribute mappings in database + stored_attributes = [] + for original_text, neutralized_text in result.mapping.items(): + # Extract pattern type and UUID from the neutralized text format [type.uuid] + pattern_type = "unknown" + placeholder_uuid = None + + if neutralized_text.startswith("[") and "." in neutralized_text and neutralized_text.endswith("]"): + # Extract type and UUID from [type.uuid] format + inner = neutralized_text[1:-1] # Remove [ and ] + if "." in inner: + pattern_type, placeholder_uuid = inner.split(".", 1) + + # Check if this exact original text already has a placeholder in the database + existing_attribute = self.getExistingPlaceholder(original_text) + + if existing_attribute: + # Reuse existing placeholder + existing_uuid = existing_attribute.id + existing_pattern_type = existing_attribute.patternType + + # Update the neutralized text to use the existing UUID + result.data = result.data.replace(neutralized_text, f"[{existing_pattern_type}.{existing_uuid}]") + result.mapping[original_text] = f"[{existing_pattern_type}.{existing_uuid}]" + + stored_attributes.append(existing_attribute) + else: + # Create new attribute record with the UUID that the neutralizer generated + attribute_data = { + "id": placeholder_uuid, # Use the UUID from the neutralizer + "mandateId": self.mandateId, + "userId": self.userId, + "originalText": original_text, + "fileId": file_id, + "patternType": pattern_type + } + + attribute = DataNeutralizerAttributes.from_dict(attribute_data) + created_attribute = self.db.recordCreate(DataNeutralizerAttributes, attribute) + stored_attributes.append(created_attribute) + + + # The neutralized text is already in the correct [type.uuid] format + # No need to replace it, as it's already properly formatted + + return { + "neutralized_text": result.data, + "attributes": stored_attributes, + "mapping": result.mapping, + "replaced_fields": result.replaced_fields, + "processed_info": result.processed_info + } + + except Exception as e: + logger.error(f"Error neutralizing text: {str(e)}") + raise ValueError(f"Failed to neutralize text: {str(e)}") + + def getExistingPlaceholder(self, original_text: str) -> Optional[DataNeutralizerAttributes]: + """Get existing placeholder for original text if it exists""" + try: + existing_attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={ + "mandateId": self.mandateId, + "userId": self.userId, + "originalText": original_text + }) + + if existing_attributes: + return DataNeutralizerAttributes.from_dict(existing_attributes[0]) + return None + + except Exception as e: + logger.error(f"Error getting existing placeholder: {str(e)}") + return None + + def getNeutralizationAttributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]: + """Get neutralization attributes, optionally filtered by file ID""" + try: + filter_dict = {"mandateId": self.mandateId} + if file_id: + filter_dict["fileId"] = file_id + + attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter=filter_dict) + filtered_attributes = self._uam(DataNeutralizerAttributes, attributes) + + return [DataNeutralizerAttributes.from_dict(attr) for attr in filtered_attributes] + + except Exception as e: + logger.error(f"Error getting neutralization attributes: {str(e)}") + return [] + + def resolveNeutralizedText(self, text: str) -> str: + """Resolve UIDs in neutralized text back to original text""" + try: + # Find all placeholders in the new format [type.uuid] + placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]' + matches = re.findall(placeholder_pattern, text) + + resolved_text = text + for placeholder_type, uid in matches: + # Find the attribute with this UID (which is the record ID) + attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={ + "mandateId": self.mandateId, + "id": uid + }) + + if attributes: + attribute = attributes[0] + # Replace placeholder with original text + placeholder = f"[{placeholder_type}.{uid}]" + resolved_text = resolved_text.replace(placeholder, attribute["originalText"]) + else: + logger.warning(f"No attribute found for UID {uid}") + + return resolved_text + + except Exception as e: + logger.error(f"Error resolving neutralized text: {str(e)}") + return text + + def deleteNeutralizationAttributes(self, file_id: str) -> bool: + """Delete all neutralization attributes for a specific file""" + try: + attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={ + "mandateId": self.mandateId, + "fileId": file_id + }) + + for attribute in attributes: + self.db.recordDelete(DataNeutralizerAttributes, attribute["id"]) + + logger.info(f"Deleted {len(attributes)} neutralization attributes for file {file_id}") + return True + + except Exception as e: + logger.error(f"Error deleting neutralization attributes: {str(e)}") + return False + # Public Methods def getInterface(currentUser: User) -> AppObjects: diff --git a/modules/neutralizer/neutralizer.py b/modules/neutralizer/neutralizer.py index 6d722f29..87427611 100644 --- a/modules/neutralizer/neutralizer.py +++ b/modules/neutralizer/neutralizer.py @@ -50,10 +50,15 @@ class ProcessResult: class DataAnonymizer: """Hauptklasse für die Datenanonymisierung""" - def __init__(self): - """Initialize the anonymizer with patterns""" + def __init__(self, names_to_parse: List[str] = None): + """Initialize the anonymizer with patterns and custom names + + Args: + names_to_parse: List of names to parse and replace (case-insensitive) + """ self.header_patterns = HeaderPatterns.patterns self.data_patterns = DataPatterns.patterns + self.names_to_parse = names_to_parse or [] self.replaced_fields = set() self.mapping = {} self.processing_info = [] @@ -63,6 +68,7 @@ class DataAnonymizer: text = re.sub(r'\s+', ' ', text) text = text.replace('\r\n', '\n').replace('\r', '\n') return text.strip() + def _is_table_line(self, line: str) -> bool: """Check if a line represents a table row""" @@ -110,46 +116,72 @@ class DataAnonymizer: except Exception as e: logger.error(f"Error anonymizing table: {str(e)}") - logger.debug(traceback.format_exc()) raise def _anonymize_plain_text(self, text: PlainText) -> PlainText: - """Anonymize plain text content""" + """Anonymize plain text content using simple search-and-replace approach""" try: - # Process the entire text at once instead of line by line current_text = text.content - # Find all matches in the entire text - matches = find_patterns_in_text(current_text, self.data_patterns) - - # Process matches in reverse order to avoid position shifting - for match in sorted(matches, key=lambda x: x[2], reverse=True): - pattern_name, matched_text, start, end = match + # Step 1: Replace custom names first (simple regex search-and-replace) + for name in self.names_to_parse: + if not name.strip(): + continue + + # Create case-insensitive regex pattern with word boundaries + pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE) - # Skip if the matched text is already a placeholder - if re.match(r'\[[A-Z_]+\d+\]', matched_text): + # Find all matches for this name + matches = list(pattern.finditer(current_text)) + + # Replace each match with a placeholder + for match in reversed(matches): # Process from right to left to avoid position shifts + matched_text = match.group() + if matched_text not in self.mapping: + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + self.mapping[matched_text] = f"[name.{placeholder_id}]" + + replacement = self.mapping[matched_text] + start, end = match.span() + current_text = current_text[:start] + replacement + current_text[end:] + + # Step 2: Replace pattern-based matches (emails, phones, etc.) + # Use the same simple approach for patterns + pattern_matches = find_patterns_in_text(current_text, self.data_patterns) + + # Process pattern matches from right to left to avoid position shifts + for pattern_name, matched_text, start, end in reversed(pattern_matches): + # Skip if already a placeholder + if re.match(r'\[[a-z]+\.[a-f0-9-]+\]', matched_text): continue - # Find the pattern that matched - pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) - if pattern: - # Use the pattern's replacement template - if matched_text not in self.mapping: - self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1) - replacement = self.mapping[matched_text] - - if pattern_name == 'email': - print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'") - print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}") + # Skip if contains placeholder characters + if '[' in matched_text or ']' in matched_text: + continue - # Replace the matched text while preserving surrounding whitespace + if matched_text not in self.mapping: + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern_name, 'data') + self.mapping[matched_text] = f"[{placeholder_type}.{placeholder_id}]" + + replacement = self.mapping[matched_text] current_text = current_text[:start] + replacement + current_text[end:] return PlainText(content=current_text, source_type=text.source_type) except Exception as e: logger.error(f"Error anonymizing plain text: {str(e)}") - logger.debug(traceback.format_exc()) raise def _anonymize_json_value(self, value: Any, key: str = None) -> Any: @@ -173,16 +205,49 @@ class DataAnonymizer: pattern = get_pattern_for_header(key, self.header_patterns) if pattern: if value not in self.mapping: - self.mapping[value] = pattern.replacement_template.format(len(self.mapping) + 1) + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'name': 'name', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern.name, 'data') + self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]" return self.mapping[value] # Check if the value itself matches any patterns - matches = find_patterns_in_text(value, self.data_patterns) - if matches: - # Use the first match's pattern - pattern_name = matches[0][0] - if value not in self.mapping: - self.mapping[value] = f"{pattern_name.upper()}_{len(self.mapping) + 1}" + pattern_matches = find_patterns_in_text(value, self.data_patterns) + custom_name_matches = self._find_custom_names(value) + + if pattern_matches or custom_name_matches: + # Use the first match's pattern or custom name + if pattern_matches: + pattern_name = pattern_matches[0][0] + if value not in self.mapping: + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'name': 'name', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern_name, 'data') + self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]" + elif custom_name_matches: + if value not in self.mapping: + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + self.mapping[value] = f"[name.{placeholder_id}]" return self.mapping[value] return value @@ -207,7 +272,19 @@ class DataAnonymizer: pattern = get_pattern_for_header(attr_name, self.header_patterns) if pattern: if attr_value not in self.mapping: - self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1) + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'name': 'name', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern.name, 'data') + self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]" processed_attrs[attr_name] = self.mapping[attr_value] else: # Check if attribute value matches any data patterns @@ -217,7 +294,19 @@ class DataAnonymizer: pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) if pattern: if attr_value not in self.mapping: - self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1) + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'name': 'name', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern_name, 'data') + self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]" processed_attrs[attr_name] = self.mapping[attr_value] else: processed_attrs[attr_name] = attr_value @@ -230,14 +319,36 @@ class DataAnonymizer: # Process text content text = element.text.strip() if element.text and element.text.strip() else '' if text: - # Check if text matches any patterns - matches = find_patterns_in_text(text, self.data_patterns) - if matches: - pattern_name = matches[0][0] - pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) - if pattern: + # Check if text matches any patterns or custom names + pattern_matches = find_patterns_in_text(text, self.data_patterns) + custom_name_matches = self._find_custom_names(text) + + if pattern_matches or custom_name_matches: + if pattern_matches: + pattern_name = pattern_matches[0][0] + pattern = next((p for p in self.data_patterns if p.name == pattern_name), None) + if pattern: + if text not in self.mapping: + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + # Create placeholder in format [type.uuid] + type_mapping = { + 'email': 'email', + 'phone': 'phone', + 'name': 'name', + 'address': 'address', + 'id': 'id' + } + placeholder_type = type_mapping.get(pattern_name, 'data') + self.mapping[text] = f"[{placeholder_type}.{placeholder_id}]" + text = self.mapping[text] + elif custom_name_matches: if text not in self.mapping: - self.mapping[text] = pattern.replacement_template.format(len(self.mapping) + 1) + # Generate a UUID for the placeholder + import uuid + placeholder_id = str(uuid.uuid4()) + self.mapping[text] = f"[name.{placeholder_id}]" text = self.mapping[text] # Process child elements @@ -271,18 +382,24 @@ class DataAnonymizer: ProcessResult: Contains anonymized data, mapping, replaced fields and processing info """ try: + # Check if content is binary data is_binary = False try: - # Try to decode base64 if it's a string - try: - decoded = base64.b64decode(content) - # If it's not valid text, consider it binary - decoded.decode('utf-8') - except (base64.binascii.Error, UnicodeDecodeError): - is_binary = True - except Exception: - is_binary = True + # First, check if content looks like base64 (contains only base64 characters) + if re.match(r'^[A-Za-z0-9+/]*={0,2}$', content.strip()): + # Try to decode base64 if it looks like base64 + try: + decoded = base64.b64decode(content) + # If it's not valid text, consider it binary + decoded.decode('utf-8') + is_binary = True + except (base64.binascii.Error, UnicodeDecodeError): + is_binary = False + else: + is_binary = False + except Exception as e: + is_binary = False if is_binary: # TODO: Implement binary data neutralization @@ -356,7 +473,7 @@ class DataAnonymizer: # Combine all processed content result = content - for text, anonymized_text in zip(plain_texts, anonymized_texts): + for i, (text, anonymized_text) in enumerate(zip(plain_texts, anonymized_texts)): if text.content != anonymized_text.content: result = result.replace(text.content, anonymized_text.content) @@ -364,5 +481,4 @@ class DataAnonymizer: except Exception as e: logger.error(f"Error processing content: {str(e)}") - logger.debug(traceback.format_exc()) return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)}) \ No newline at end of file diff --git a/modules/neutralizer/patterns.py b/modules/neutralizer/patterns.py index 4cfbed93..175a690f 100644 --- a/modules/neutralizer/patterns.py +++ b/modules/neutralizer/patterns.py @@ -232,16 +232,6 @@ class HeaderPatterns: class DataPatterns: """Patterns for identifying sensitive data in content""" patterns = [ - # Name patterns - Pattern( - name="name", - patterns=[ - # Person names with titles and academic degrees - r'\b(?:Dr\.|Prof\.|PhD\.?|MD\.?|Herr|Frau|Mr\.|Mrs\.|Ms\.|Monsieur|Madame|Signore|Signora)\s+[A-Z][a-z]{2,}(?:\s+[A-Za-z]{2,}){1,2}\b' - ], - replacement_template="[NAME_{}]" - ), - # Email pattern for plain text Pattern( name="email", @@ -392,11 +382,6 @@ def find_patterns_in_text(text: str, patterns: List[Pattern]) -> List[tuple]: matches = [] for pattern in patterns: for p in pattern.patterns: - if pattern.name == 'email': - print(f"\nDEBUG: Checking email pattern '{p}'") for match in re.finditer(p, text, re.IGNORECASE): - if pattern.name == 'email': - print(f"DEBUG: Found email match: '{match.group(0)}' at position {match.start()}-{match.end()}") - print(f"DEBUG: Context: '{text[max(0, match.start()-20):match.end()+20]}'") matches.append((pattern.name, match.group(0), match.start(), match.end())) return sorted(matches, key=lambda x: x[2]) # Sort by start position \ No newline at end of file diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index 2be59f21..3243bc21 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -17,7 +17,8 @@ from modules.security.auth import limiter, getCurrentUser import modules.interfaces.interfaceComponentObjects as interfaceComponentObjects from modules.interfaces.interfaceComponentModel import FileItem, FilePreview from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse, AttributeDefinition -from modules.interfaces.interfaceAppModel import User +from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes +from modules.services.serviceNeutralization import NeutralizationService # Configure logger logger = logging.getLogger(__name__) @@ -364,3 +365,253 @@ async def preview_file( detail=f"Error previewing file: {str(e)}" ) +# Data Neutralization endpoints + +@router.get("/neutralization/config", response_model=DataNeutraliserConfig) +@limiter.limit("30/minute") +async def get_neutralization_config( + request: Request, + currentUser: User = Depends(getCurrentUser) +) -> DataNeutraliserConfig: + """Get data neutralization configuration""" + try: + service = NeutralizationService(currentUser) + config = service.get_config() + + if not config: + # Return default config instead of 404 + return DataNeutraliserConfig( + mandateId=currentUser.mandateId, + userId=currentUser.id, + enabled=True, + namesToParse="", + sharepointSourcePath="", + sharepointTargetPath="" + ) + + return config + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting neutralization config: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error getting neutralization config: {str(e)}" + ) + +@router.post("/neutralization/config", response_model=DataNeutraliserConfig) +@limiter.limit("10/minute") +async def save_neutralization_config( + request: Request, + config_data: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> DataNeutraliserConfig: + """Save or update data neutralization configuration""" + try: + service = NeutralizationService(currentUser) + config = service.save_config(config_data) + + return config + + except Exception as e: + logger.error(f"Error saving neutralization config: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error saving neutralization config: {str(e)}" + ) + +@router.post("/neutralization/neutralize-text", response_model=Dict[str, Any]) +@limiter.limit("20/minute") +async def neutralize_text( + request: Request, + text_data: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """Neutralize text content""" + try: + text = text_data.get("text", "") + file_id = text_data.get("fileId") + + if not text: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Text content is required" + ) + + service = NeutralizationService(currentUser) + result = service.neutralize_text(text, file_id) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error neutralizing text: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error neutralizing text: {str(e)}" + ) + +@router.post("/neutralization/resolve-text", response_model=Dict[str, str]) +@limiter.limit("20/minute") +async def resolve_text( + request: Request, + text_data: Dict[str, str] = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, str]: + """Resolve UIDs in neutralized text back to original text""" + try: + text = text_data.get("text", "") + + if not text: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Text content is required" + ) + + service = NeutralizationService(currentUser) + resolved_text = service.resolve_text(text) + + return {"resolved_text": resolved_text} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error resolving text: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error resolving text: {str(e)}" + ) + +@router.get("/neutralization/attributes", response_model=List[DataNeutralizerAttributes]) +@limiter.limit("30/minute") +async def get_neutralization_attributes( + request: Request, + fileId: Optional[str] = Query(None, description="Filter by file ID"), + currentUser: User = Depends(getCurrentUser) +) -> List[DataNeutralizerAttributes]: + """Get neutralization attributes, optionally filtered by file ID""" + try: + service = NeutralizationService(currentUser) + attributes = service.get_attributes(fileId) + + return attributes + + except Exception as e: + logger.error(f"Error getting neutralization attributes: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error getting neutralization attributes: {str(e)}" + ) + +@router.post("/neutralization/process-sharepoint", response_model=Dict[str, Any]) +@limiter.limit("5/minute") +async def process_sharepoint_files( + request: Request, + paths_data: Dict[str, str] = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """Process files from SharePoint source path and store neutralized files in target path""" + try: + source_path = paths_data.get("sourcePath", "") + target_path = paths_data.get("targetPath", "") + + if not source_path or not target_path: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Both source and target paths are required" + ) + + service = NeutralizationService(currentUser) + result = await service.process_sharepoint_files(source_path, target_path) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error processing SharePoint files: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing SharePoint files: {str(e)}" + ) + +@router.post("/neutralization/batch-process", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +async def batch_process_files( + request: Request, + files_data: List[Dict[str, Any]] = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """Process multiple files for neutralization""" + try: + if not files_data: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Files data is required" + ) + + service = NeutralizationService(currentUser) + result = service.batch_neutralize_files(files_data) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error batch processing files: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error batch processing files: {str(e)}" + ) + +@router.get("/neutralization/stats", response_model=Dict[str, Any]) +@limiter.limit("30/minute") +async def get_neutralization_stats( + request: Request, + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """Get neutralization processing statistics""" + try: + service = NeutralizationService(currentUser) + stats = service.get_processing_stats() + + return stats + + except Exception as e: + logger.error(f"Error getting neutralization stats: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error getting neutralization stats: {str(e)}" + ) + +@router.delete("/neutralization/attributes/{fileId}", response_model=Dict[str, str]) +@limiter.limit("10/minute") +async def cleanup_file_attributes( + request: Request, + fileId: str = Path(..., description="File ID to cleanup attributes for"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, str]: + """Clean up neutralization attributes for a specific file""" + try: + service = NeutralizationService(currentUser) + success = service.cleanup_file_attributes(fileId) + + if success: + return {"message": f"Successfully cleaned up attributes for file {fileId}"} + else: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to cleanup file attributes" + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error cleaning up file attributes: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error cleaning up file attributes: {str(e)}" + ) + diff --git a/modules/services/serviceNeutralization.py b/modules/services/serviceNeutralization.py new file mode 100644 index 00000000..e5c75a37 --- /dev/null +++ b/modules/services/serviceNeutralization.py @@ -0,0 +1,587 @@ +""" +Data Neutralization Service +Handles file processing for data neutralization including SharePoint integration +""" + +import logging +import os +import uuid +from typing import Dict, List, Any, Optional, Tuple +from datetime import datetime +from pathlib import Path +import mimetypes + +from modules.interfaces.interfaceAppObjects import getInterface +from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes +from modules.neutralizer.neutralizer import DataAnonymizer +from modules.shared.timezoneUtils import get_utc_timestamp + +logger = logging.getLogger(__name__) + +class NeutralizationService: + """Service for handling data neutralization operations""" + + def __init__(self, current_user: User): + """Initialize the service with user context""" + self.current_user = current_user + self.app_interface = getInterface(current_user) + + def get_config(self) -> Optional[DataNeutraliserConfig]: + """Get the neutralization configuration for the current user's mandate""" + return self.app_interface.getNeutralizationConfig() + + def save_config(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig: + """Save or update the neutralization configuration""" + return self.app_interface.createOrUpdateNeutralizationConfig(config_data) + + def neutralize_text(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]: + """Neutralize text content and return results with attribute mappings""" + return self.app_interface.neutralizeText(text, file_id) + + def get_attributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]: + """Get neutralization attributes, optionally filtered by file ID""" + return self.app_interface.getNeutralizationAttributes(file_id) + + def resolve_text(self, text: str) -> str: + """Resolve UIDs in neutralized text back to original text""" + return self.app_interface.resolveNeutralizedText(text) + + async def process_sharepoint_files(self, source_path: str, target_path: str) -> Dict[str, Any]: + """ + Process files from SharePoint source path, neutralize them, and store in target path + + Args: + source_path: SharePoint path to read files from + target_path: SharePoint path to store neutralized files + + Returns: + Dictionary with processing results + """ + try: + logger.info(f"Processing SharePoint files from {source_path} to {target_path}") + + # Get user's SharePoint connection that matches the source path + sharepoint_connection = await self._get_sharepoint_connection(source_path) + if not sharepoint_connection: + return { + "success": False, + "message": "No SharePoint connection found for user", + "processed_files": 0, + "errors": ["No SharePoint connection found"] + } + + logger.info(f"Using SharePoint connection: {sharepoint_connection.get('id')} for path: {source_path}") + + # Get SharePoint access token + sharepoint_token = self.app_interface.getConnectionToken(sharepoint_connection["id"]) + if not sharepoint_token: + return { + "success": False, + "message": "No SharePoint access token found", + "processed_files": 0, + "errors": ["No SharePoint access token found"] + } + + # Process files asynchronously + return await self._process_sharepoint_files_async( + source_path, target_path, sharepoint_token.tokenAccess + ) + + except Exception as e: + logger.error(f"Error processing SharePoint files: {str(e)}") + return { + "success": False, + "message": f"Error processing SharePoint files: {str(e)}", + "processed_files": 0, + "errors": [str(e)] + } + + async def _get_sharepoint_connection(self, sharepoint_path: str = None): + """Get user's SharePoint connection that matches the given path""" + try: + # Get all user connections + from modules.interfaces.interfaceAppModel import UserConnection + connections = self.app_interface.db.getRecordset( + UserConnection, + recordFilter={"userId": self.app_interface.userId} + ) + + # Find all Microsoft connections + msft_connections = [conn for conn in connections if conn.get("authority") == "msft"] + + if not msft_connections: + logger.warning("No Microsoft connections found for user") + return None + + if len(msft_connections) == 1: + logger.info(f"Found single Microsoft connection: {msft_connections[0].get('id')}") + return msft_connections[0] + + # If multiple connections and we have a path, try to match + if sharepoint_path: + return await self._match_connection_to_path(msft_connections, sharepoint_path) + + # If no path provided, return the first one + logger.info(f"Multiple Microsoft connections found, using first one: {msft_connections[0].get('id')}") + return msft_connections[0] + + except Exception as e: + logger.error(f"Error getting SharePoint connection: {str(e)}") + return None + + async def _match_connection_to_path(self, connections: list, sharepoint_path: str): + """Match a connection to the SharePoint path by testing access""" + try: + # Extract domain from the path + from urllib.parse import urlparse + parsed_url = urlparse(sharepoint_path) + target_domain = parsed_url.netloc.lower() + + logger.info(f"Looking for connection matching domain: {target_domain}") + + # Try each connection to see which one can access the site + for connection in connections: + try: + # Get token for this connection + token = self.app_interface.getConnectionToken(connection["id"]) + if not token: + continue + + # Test if this connection can access the SharePoint site + if await self._test_sharepoint_access(token.tokenAccess, sharepoint_path): + logger.info(f"Found matching connection for domain {target_domain}: {connection.get('id')}") + return connection + + except Exception as e: + continue + + # If no specific match found, return the first connection + logger.warning(f"No specific connection match found for {target_domain}, using first available") + return connections[0] + + except Exception as e: + logger.error(f"Error matching connection to path: {str(e)}") + return connections[0] if connections else None + + async def _test_sharepoint_access(self, access_token: str, sharepoint_path: str) -> bool: + """Test if the access token can access the given SharePoint path""" + try: + return await self._test_sharepoint_access_async(access_token, sharepoint_path) + except Exception as e: + return False + + async def _test_sharepoint_access_async(self, access_token: str, sharepoint_path: str) -> bool: + """Async test for SharePoint access""" + try: + from modules.connectors.connectorSharepoint import ConnectorSharepoint + + connector = ConnectorSharepoint(access_token=access_token) + + # Parse the path to get site URL + site_url, _ = self._parse_sharepoint_path(sharepoint_path) + if not site_url: + return False + + # Try to find the site + site_info = await connector.find_site_by_web_url(site_url) + return site_info is not None + + except Exception as e: + return False + + async def _process_sharepoint_files_async(self, source_path: str, target_path: str, access_token: str) -> Dict[str, Any]: + """Process SharePoint files asynchronously""" + try: + import asyncio + from modules.connectors.connectorSharepoint import ConnectorSharepoint + + # Initialize SharePoint connector + connector = ConnectorSharepoint(access_token=access_token) + + # Parse source and target paths to extract site and folder info + source_site, source_folder = self._parse_sharepoint_path(source_path) + target_site, target_folder = self._parse_sharepoint_path(target_path) + + if not source_site or not target_site: + return { + "success": False, + "message": "Invalid SharePoint path format", + "processed_files": 0, + "errors": ["Invalid SharePoint path format"] + } + + # Find source site + source_site_info = await connector.find_site_by_web_url(source_site) + if not source_site_info: + return { + "success": False, + "message": f"Source site not found: {source_site}", + "processed_files": 0, + "errors": [f"Source site not found: {source_site}"] + } + + # Find target site + target_site_info = await connector.find_site_by_web_url(target_site) + if not target_site_info: + return { + "success": False, + "message": f"Target site not found: {target_site}", + "processed_files": 0, + "errors": [f"Target site not found: {target_site}"] + } + + # List files in source folder + logger.info(f"Listing files in folder: {source_folder} for site: {source_site_info['id']}") + files = await connector.list_folder_contents(source_site_info["id"], source_folder) + + # If no files found, try listing the root folder to see what's available + if not files: + logger.warning(f"No files found in folder '{source_folder}', trying root folder") + files = await connector.list_folder_contents(source_site_info["id"], "") + + if files: + # List available folders for debugging + folders = [f for f in files if f.get("type") == "folder"] + folder_names = [f.get('name') for f in folders] + logger.info(f"Available folders in root: {folder_names}") + + # Format folder list for better UI display + folder_list = ", ".join(folder_names) if folder_names else "None" + + return { + "success": False, + "message": f"Folder '{source_folder}' not found. Available folders in root: {folder_list}", + "processed_files": 0, + "errors": [f"Folder '{source_folder}' not found. Available folders: {folder_list}"], + "available_folders": folder_names + } + else: + return { + "success": False, + "message": f"No files found in source folder: {source_folder}", + "processed_files": 0, + "errors": [f"No files found in source folder: {source_folder}"] + } + + # Filter for text files only + text_files = [f for f in files if f.get("type") == "file" and self._is_text_file(f.get("name", ""))] + + if not text_files: + return { + "success": False, + "message": "No text files found in source folder", + "processed_files": 0, + "errors": ["No text files found in source folder"] + } + + # Process files in parallel for better performance + processed_files = [] + errors = [] + + # Create tasks for parallel processing + async def process_single_file(file_info): + """Process a single file - download, neutralize, upload""" + try: + # Download file + file_content = await connector.download_file(source_site_info["id"], file_info["id"]) + if not file_content: + return {"error": f"Failed to download file: {file_info['name']}"} + + # Convert to text + try: + text_content = file_content.decode('utf-8') + except UnicodeDecodeError: + text_content = file_content.decode('latin-1') + + # Neutralize the text + neutralization_result = self.app_interface.neutralizeText(text_content, file_info["id"]) + + # Create neutralized filename + neutralized_filename = f"neutralized_{file_info['name']}" + + # Upload neutralized file + neutralized_content = neutralization_result["neutralized_text"].encode('utf-8') + upload_result = await connector.upload_file( + target_site_info["id"], + target_folder, + neutralized_filename, + neutralized_content + ) + + if "error" in upload_result: + return {"error": f"Failed to upload neutralized file: {neutralized_filename} - {upload_result['error']}"} + else: + return { + "success": True, + "original_name": file_info["name"], + "neutralized_name": neutralized_filename, + "attributes_count": len(neutralization_result.get("attributes", [])) + } + + except Exception as e: + error_msg = f"Error processing file {file_info['name']}: {str(e)}" + logger.error(error_msg) + return {"error": error_msg} + + # Process all files in parallel + logger.info(f"Processing {len(text_files)} files in parallel...") + tasks = [process_single_file(file_info) for file_info in text_files] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + for i, result in enumerate(results): + if isinstance(result, Exception): + error_msg = f"Exception processing file {text_files[i]['name']}: {str(result)}" + errors.append(error_msg) + logger.error(error_msg) + elif isinstance(result, dict) and "error" in result: + errors.append(result["error"]) + elif isinstance(result, dict) and result.get("success"): + processed_files.append({ + "original_name": result["original_name"], + "neutralized_name": result["neutralized_name"], + "attributes_count": result["attributes_count"] + }) + logger.info(f"Successfully processed file: {result['original_name']} -> {result['neutralized_name']}") + else: + error_msg = f"Unknown result processing file {text_files[i]['name']}: {result}" + errors.append(error_msg) + logger.error(error_msg) + + return { + "success": len(processed_files) > 0, + "message": f"Processed {len(processed_files)} files successfully", + "processed_files": len(processed_files), + "files": processed_files, + "errors": errors + } + + except Exception as e: + logger.error(f"Error in async SharePoint processing: {str(e)}") + return { + "success": False, + "message": f"Error in async SharePoint processing: {str(e)}", + "processed_files": 0, + "errors": [str(e)] + } + + def _parse_sharepoint_path(self, path: str) -> tuple[str, str]: + """Parse SharePoint path to extract site URL and folder path""" + try: + # Expected format: https://domain.sharepoint.com/sites/sitename/folder/path + if not path.startswith("https://"): + return None, None + + # Remove query parameters + if "?" in path: + path = path.split("?")[0] + + # Split by /sites/ + if "/sites/" not in path: + return None, None + + parts = path.split("/sites/", 1) + if len(parts) != 2: + return None, None + + # Extract domain and site name + domain = parts[0].replace("https://", "") + site_name = parts[1].split("/")[0] + + # Create proper site URL for Graph API + site_url = f"https://{domain}/sites/{site_name}" + + # Extract folder path (everything after the site name) + folder_parts = parts[1].split("/")[1:] + folder_path = "/".join(folder_parts) if folder_parts else "" + + # URL decode the folder path + from urllib.parse import unquote + folder_path = unquote(folder_path) + + + return site_url, folder_path + + except Exception as e: + logger.error(f"Error parsing SharePoint path '{path}': {str(e)}") + return None, None + + def _is_text_file(self, filename: str) -> bool: + """Check if file is a text file based on extension""" + text_extensions = [ + '.txt', '.csv', '.json', '.xml', '.md', '.log', + '.doc', '.docx', '.rtf', '.odt', # Document formats + '.html', '.htm', '.css', '.js', '.ts', '.py', '.java', '.cpp', '.c', '.h', # Code files + '.ini', '.cfg', '.conf', '.properties', # Config files + '.sql', '.yaml', '.yml', '.toml', # Data/config files + '.ps1', '.bat', '.sh', '.bash' # Script files + ] + return any(filename.lower().endswith(ext) for ext in text_extensions) + + def process_file_content(self, file_content: bytes, file_name: str, mime_type: str) -> Dict[str, Any]: + """ + Process file content for neutralization + + Args: + file_content: Binary file content + file_name: Name of the file + mime_type: MIME type of the file + + Returns: + Dictionary with neutralization results + """ + try: + # Determine content type based on MIME type + content_type = self._get_content_type_from_mime(mime_type) + + # Decode content to text + try: + text_content = file_content.decode('utf-8') + except UnicodeDecodeError: + # Try with different encodings + for encoding in ['latin-1', 'cp1252', 'iso-8859-1']: + try: + text_content = file_content.decode(encoding) + break + except UnicodeDecodeError: + continue + else: + raise ValueError("Unable to decode file content") + + # Generate a temporary file ID for tracking + temp_file_id = str(uuid.uuid4()) + + # Neutralize the content + neutralization_result = self.neutralize_text(text_content, temp_file_id) + + # Encode the neutralized content back to bytes + neutralized_content = neutralization_result["neutralized_text"].encode('utf-8') + + # Generate neutralized file name + neutralized_file_name = f"neutralized_{file_name}" + + return { + "success": True, + "original_content": text_content, + "neutralized_content": neutralization_result["neutralized_text"], + "neutralized_file_name": neutralized_file_name, + "attributes": neutralization_result["attributes"], + "mapping": neutralization_result["mapping"], + "file_id": temp_file_id + } + + except Exception as e: + logger.error(f"Error processing file content: {str(e)}") + return { + "success": False, + "error": str(e), + "original_content": None, + "neutralized_content": None + } + + def _get_content_type_from_mime(self, mime_type: str) -> str: + """Determine content type from MIME type for neutralization processing""" + if mime_type.startswith('text/'): + return 'text' + elif mime_type in ['application/json', 'application/xml', 'text/xml']: + return 'json' if 'json' in mime_type else 'xml' + elif mime_type in ['text/csv', 'application/csv']: + return 'csv' + else: + return 'text' # Default to text processing + + def batch_neutralize_files(self, files_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Process multiple files for neutralization + + Args: + files_data: List of dictionaries containing file information + Each dict should have: content, name, mime_type + + Returns: + Dictionary with batch processing results + """ + try: + results = [] + total_files = len(files_data) + successful_files = 0 + errors = [] + + for file_data in files_data: + try: + result = self.process_file_content( + file_data['content'], + file_data['name'], + file_data['mime_type'] + ) + + if result['success']: + successful_files += 1 + results.append({ + 'file_name': file_data['name'], + 'neutralized_file_name': result['neutralized_file_name'], + 'file_id': result['file_id'], + 'attributes_count': len(result['attributes']) + }) + else: + errors.append(f"Failed to process {file_data['name']}: {result['error']}") + + except Exception as e: + error_msg = f"Error processing {file_data['name']}: {str(e)}" + errors.append(error_msg) + logger.error(error_msg) + + return { + "success": len(errors) == 0, + "total_files": total_files, + "successful_files": successful_files, + "failed_files": len(errors), + "results": results, + "errors": errors + } + + except Exception as e: + logger.error(f"Error in batch neutralization: {str(e)}") + return { + "success": False, + "total_files": len(files_data), + "successful_files": 0, + "failed_files": len(files_data), + "results": [], + "errors": [str(e)] + } + + def cleanup_file_attributes(self, file_id: str) -> bool: + """Clean up neutralization attributes for a specific file""" + return self.app_interface.deleteNeutralizationAttributes(file_id) + + def get_processing_stats(self) -> Dict[str, Any]: + """Get statistics about neutralization processing""" + try: + # Get all attributes for the current mandate + all_attributes = self.get_attributes() + + # Group by pattern type + pattern_counts = {} + for attr in all_attributes: + pattern_type = attr.patternType + pattern_counts[pattern_type] = pattern_counts.get(pattern_type, 0) + 1 + + # Get unique files + unique_files = set(attr.fileId for attr in all_attributes if attr.fileId) + + return { + "total_attributes": len(all_attributes), + "unique_files": len(unique_files), + "pattern_counts": pattern_counts, + "mandate_id": self.current_user.mandateId + } + + except Exception as e: + logger.error(f"Error getting processing stats: {str(e)}") + return { + "total_attributes": 0, + "unique_files": 0, + "pattern_counts": {}, + "error": str(e) + } diff --git a/tests/test_neutralizer/neutralizer.py b/tests/test_neutralizer/neutralizer.py index 9646d632..0691cd88 100644 --- a/tests/test_neutralizer/neutralizer.py +++ b/tests/test_neutralizer/neutralizer.py @@ -129,11 +129,7 @@ class DataAnonymizer: if matched_text not in self.mapping: self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1) replacement = self.mapping[matched_text] - - if pattern_name == 'email': - print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'") - print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}") - + # Replace the matched text while preserving surrounding whitespace current_text = current_text[:start] + replacement + current_text[end:] diff --git a/tests/test_neutralizer/patterns.py b/tests/test_neutralizer/patterns.py index 4cfbed93..e5ae46e3 100644 --- a/tests/test_neutralizer/patterns.py +++ b/tests/test_neutralizer/patterns.py @@ -392,11 +392,5 @@ def find_patterns_in_text(text: str, patterns: List[Pattern]) -> List[tuple]: matches = [] for pattern in patterns: for p in pattern.patterns: - if pattern.name == 'email': - print(f"\nDEBUG: Checking email pattern '{p}'") - for match in re.finditer(p, text, re.IGNORECASE): - if pattern.name == 'email': - print(f"DEBUG: Found email match: '{match.group(0)}' at position {match.start()}-{match.end()}") - print(f"DEBUG: Context: '{text[max(0, match.start()-20):match.end()+20]}'") matches.append((pattern.name, match.group(0), match.start(), match.end())) return sorted(matches, key=lambda x: x[2]) # Sort by start position \ No newline at end of file