From a941963e78438cf500030a9e2f75c183fe30744e Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 17 Sep 2025 02:12:34 +0200
Subject: [PATCH] neutralizer 1.0
---
modules/interfaces/interfaceAppAccess.py | 53 ++
modules/interfaces/interfaceAppModel.py | 120 +++++
modules/interfaces/interfaceAppObjects.py | 209 +++++++-
modules/neutralizer/neutralizer.py | 222 ++++++--
modules/neutralizer/patterns.py | 15 -
modules/routes/routeDataFiles.py | 253 +++++++++-
modules/services/serviceNeutralization.py | 587 ++++++++++++++++++++++
tests/test_neutralizer/neutralizer.py | 6 +-
tests/test_neutralizer/patterns.py | 6 -
9 files changed, 1390 insertions(+), 81 deletions(-)
create mode 100644 modules/services/serviceNeutralization.py
diff --git a/modules/interfaces/interfaceAppAccess.py b/modules/interfaces/interfaceAppAccess.py
index 25b318ad..d91dddc1 100644
--- a/modules/interfaces/interfaceAppAccess.py
+++ b/modules/interfaces/interfaceAppAccess.py
@@ -74,6 +74,28 @@ class AppAccess:
else:
# Regular users only see their own connections
filtered_records = [r for r in recordset if r.get("userId") == self.userId]
+ # Special handling for data neutralization config table
+ elif table_name == "DataNeutraliserConfig":
+ if self.privilege == UserPrivilege.SYSADMIN:
+ # SysAdmin sees all configs
+ filtered_records = recordset
+ elif self.privilege == UserPrivilege.ADMIN:
+ # Admin sees configs in their mandate
+ filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
+ else:
+ # Regular users only see their own configs
+ filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId]
+ # Special handling for data neutralizer attributes table
+ elif table_name == "DataNeutralizerAttributes":
+ if self.privilege == UserPrivilege.SYSADMIN:
+ # SysAdmin sees all attributes
+ filtered_records = recordset
+ elif self.privilege == UserPrivilege.ADMIN:
+ # Admin sees attributes in their mandate
+ filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
+ else:
+ # Regular users only see their own attributes
+ filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId]
# System admins see all other records
elif self.privilege == UserPrivilege.SYSADMIN:
filtered_records = recordset
@@ -126,6 +148,37 @@ class AppAccess:
record["_hideEdit"] = record.get("userId") != self.userId
record["_hideDelete"] = record.get("userId") != self.userId
+ elif table_name == "DataNeutraliserConfig":
+ # Everyone can view configs they have access to
+ record["_hideView"] = False
+ # SysAdmin can edit/delete any config
+ if self.privilege == UserPrivilege.SYSADMIN:
+ record["_hideEdit"] = False
+ record["_hideDelete"] = False
+ # Admin can edit/delete configs in their mandate
+ elif self.privilege == UserPrivilege.ADMIN:
+ record["_hideEdit"] = record.get("mandateId","-") != self.mandateId
+ record["_hideDelete"] = record.get("mandateId","-") != self.mandateId
+ # Regular users can only edit/delete their own configs
+ else:
+ record["_hideEdit"] = record.get("userId") != self.userId
+ record["_hideDelete"] = record.get("userId") != self.userId
+ elif table_name == "DataNeutralizerAttributes":
+ # Everyone can view attributes they have access to
+ record["_hideView"] = False
+ # SysAdmin can edit/delete any attributes
+ if self.privilege == UserPrivilege.SYSADMIN:
+ record["_hideEdit"] = False
+ record["_hideDelete"] = False
+ # Admin can edit/delete attributes in their mandate
+ elif self.privilege == UserPrivilege.ADMIN:
+ record["_hideEdit"] = record.get("mandateId","-") != self.mandateId
+ record["_hideDelete"] = record.get("mandateId","-") != self.mandateId
+ # Regular users can only edit/delete their own attributes
+ else:
+ record["_hideEdit"] = record.get("userId") != self.userId
+ record["_hideDelete"] = record.get("userId") != self.userId
+
elif table_name == "AuthEvent":
# Only show auth events for the current user or if admin
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
diff --git a/modules/interfaces/interfaceAppModel.py b/modules/interfaces/interfaceAppModel.py
index 73d8d146..15e2fbcb 100644
--- a/modules/interfaces/interfaceAppModel.py
+++ b/modules/interfaces/interfaceAppModel.py
@@ -427,6 +427,126 @@ register_model_labels(
}
)
+class DataNeutraliserConfig(BaseModel, ModelMixin):
+ """Data model for data neutralization configuration"""
+ id: str = Field(
+ default_factory=lambda: str(uuid.uuid4()),
+ description="Unique ID of the configuration",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ mandateId: str = Field(
+ description="ID of the mandate this configuration belongs to",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+ userId: str = Field(
+ description="ID of the user who created this configuration",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+ enabled: bool = Field(
+ default=True,
+ description="Whether data neutralization is enabled",
+ frontend_type="checkbox",
+ frontend_readonly=False,
+ frontend_required=False
+ )
+ namesToParse: str = Field(
+ default="",
+ description="Multiline list of names to parse for neutralization",
+ frontend_type="textarea",
+ frontend_readonly=False,
+ frontend_required=False
+ )
+ sharepointSourcePath: str = Field(
+ default="",
+ description="SharePoint path to read files for neutralization",
+ frontend_type="text",
+ frontend_readonly=False,
+ frontend_required=False
+ )
+ sharepointTargetPath: str = Field(
+ default="",
+ description="SharePoint path to store neutralized files",
+ frontend_type="text",
+ frontend_readonly=False,
+ frontend_required=False
+ )
+
+# Register labels for DataNeutraliserConfig
+register_model_labels(
+ "DataNeutraliserConfig",
+ {"en": "Data Neutralization Config", "fr": "Configuration de neutralisation des données"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "mandateId": {"en": "Mandate ID", "fr": "ID de mandat"},
+ "userId": {"en": "User ID", "fr": "ID utilisateur"},
+ "enabled": {"en": "Enabled", "fr": "Activé"},
+ "namesToParse": {"en": "Names to Parse", "fr": "Noms à analyser"},
+ "sharepointSourcePath": {"en": "Source Path", "fr": "Chemin source"},
+ "sharepointTargetPath": {"en": "Target Path", "fr": "Chemin cible"}
+ }
+)
+
+class DataNeutralizerAttributes(BaseModel, ModelMixin):
+ """Data model for neutralized data attributes mapping"""
+ id: str = Field(
+ default_factory=lambda: str(uuid.uuid4()),
+ description="Unique ID of the attribute mapping (used as UID in neutralized files)",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ mandateId: str = Field(
+ description="ID of the mandate this attribute belongs to",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+ userId: str = Field(
+ description="ID of the user who created this attribute",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+ originalText: str = Field(
+ description="Original text that was neutralized",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+ fileId: Optional[str] = Field(
+ default=None,
+ description="ID of the file this attribute belongs to",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=False
+ )
+ patternType: str = Field(
+ description="Type of pattern that matched (email, phone, name, etc.)",
+ frontend_type="text",
+ frontend_readonly=True,
+ frontend_required=True
+ )
+
+# Register labels for DataNeutralizerAttributes
+register_model_labels(
+ "DataNeutralizerAttributes",
+ {"en": "Neutralized Data Attribute", "fr": "Attribut de données neutralisées"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "mandateId": {"en": "Mandate ID", "fr": "ID de mandat"},
+ "userId": {"en": "User ID", "fr": "ID utilisateur"},
+ "originalText": {"en": "Original Text", "fr": "Texte original"},
+ "fileId": {"en": "File ID", "fr": "ID de fichier"},
+ "patternType": {"en": "Pattern Type", "fr": "Type de modèle"}
+ }
+)
+
class SystemTable(BaseModel, ModelMixin):
"""Data model for system table entries"""
table_name: str = Field(
diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py
index c71e0c03..bc786fe1 100644
--- a/modules/interfaces/interfaceAppObjects.py
+++ b/modules/interfaces/interfaceAppObjects.py
@@ -11,6 +11,7 @@ import importlib
import json
from passlib.context import CryptContext
import uuid
+import re
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
@@ -19,7 +20,8 @@ from modules.interfaces.interfaceAppAccess import AppAccess
from modules.interfaces.interfaceAppModel import (
User, Mandate, UserInDB, UserConnection,
AuthAuthority, UserPrivilege,
- ConnectionStatus, Token, AuthEvent
+ ConnectionStatus, Token, AuthEvent,
+ DataNeutraliserConfig, DataNeutralizerAttributes
)
logger = logging.getLogger(__name__)
@@ -993,6 +995,211 @@ class AppObjects:
logger.error(f"Error during logout: {str(e)}")
raise
+ # Data Neutralization methods
+
+ def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
+ """Get the data neutralization configuration for the current user's mandate"""
+ try:
+ configs = self.db.getRecordset(DataNeutraliserConfig, recordFilter={"mandateId": self.mandateId})
+ if not configs:
+ return None
+
+ # Apply access control
+ filtered_configs = self._uam(DataNeutraliserConfig, configs)
+ if not filtered_configs:
+ return None
+
+ return DataNeutraliserConfig.from_dict(filtered_configs[0])
+
+ except Exception as e:
+ logger.error(f"Error getting neutralization config: {str(e)}")
+ return None
+
+ def createOrUpdateNeutralizationConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
+ """Create or update the data neutralization configuration"""
+ try:
+ # Check if config already exists
+ existing_config = self.getNeutralizationConfig()
+
+ if existing_config:
+ # Update existing config
+ update_data = existing_config.to_dict()
+ update_data.update(config_data)
+ update_data["updatedAt"] = get_utc_timestamp()
+
+ updated_config = DataNeutraliserConfig.from_dict(update_data)
+ self.db.recordModify(DataNeutraliserConfig, existing_config.id, updated_config)
+
+ return updated_config
+ else:
+ # Create new config
+ config_data["mandateId"] = self.mandateId
+ config_data["userId"] = self.userId
+
+ new_config = DataNeutraliserConfig.from_dict(config_data)
+ created_record = self.db.recordCreate(DataNeutraliserConfig, new_config)
+
+ return DataNeutraliserConfig.from_dict(created_record)
+
+ except Exception as e:
+ logger.error(f"Error creating/updating neutralization config: {str(e)}")
+ raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
+
+ def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
+ """Neutralize text content and store attribute mappings"""
+ try:
+ from modules.neutralizer.neutralizer import DataAnonymizer
+
+ # Get neutralization configuration to extract namesToParse
+ config = self.getNeutralizationConfig()
+ names_to_parse = []
+ if config and hasattr(config, 'namesToParse') and config.namesToParse:
+ # Split by newlines and filter out empty strings
+ names_to_parse = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
+
+ # Initialize anonymizer with custom names
+ anonymizer = DataAnonymizer(names_to_parse=names_to_parse)
+
+ # Process the text
+ result = anonymizer.process_content(text, 'text')
+
+ # Store attribute mappings in database
+ stored_attributes = []
+ for original_text, neutralized_text in result.mapping.items():
+ # Extract pattern type and UUID from the neutralized text format [type.uuid]
+ pattern_type = "unknown"
+ placeholder_uuid = None
+
+ if neutralized_text.startswith("[") and "." in neutralized_text and neutralized_text.endswith("]"):
+ # Extract type and UUID from [type.uuid] format
+ inner = neutralized_text[1:-1] # Remove [ and ]
+ if "." in inner:
+ pattern_type, placeholder_uuid = inner.split(".", 1)
+
+ # Check if this exact original text already has a placeholder in the database
+ existing_attribute = self.getExistingPlaceholder(original_text)
+
+ if existing_attribute:
+ # Reuse existing placeholder
+ existing_uuid = existing_attribute.id
+ existing_pattern_type = existing_attribute.patternType
+
+ # Update the neutralized text to use the existing UUID
+ result.data = result.data.replace(neutralized_text, f"[{existing_pattern_type}.{existing_uuid}]")
+ result.mapping[original_text] = f"[{existing_pattern_type}.{existing_uuid}]"
+
+ stored_attributes.append(existing_attribute)
+ else:
+ # Create new attribute record with the UUID that the neutralizer generated
+ attribute_data = {
+ "id": placeholder_uuid, # Use the UUID from the neutralizer
+ "mandateId": self.mandateId,
+ "userId": self.userId,
+ "originalText": original_text,
+ "fileId": file_id,
+ "patternType": pattern_type
+ }
+
+ attribute = DataNeutralizerAttributes.from_dict(attribute_data)
+ created_attribute = self.db.recordCreate(DataNeutralizerAttributes, attribute)
+ stored_attributes.append(created_attribute)
+
+
+ # The neutralized text is already in the correct [type.uuid] format
+ # No need to replace it, as it's already properly formatted
+
+ return {
+ "neutralized_text": result.data,
+ "attributes": stored_attributes,
+ "mapping": result.mapping,
+ "replaced_fields": result.replaced_fields,
+ "processed_info": result.processed_info
+ }
+
+ except Exception as e:
+ logger.error(f"Error neutralizing text: {str(e)}")
+ raise ValueError(f"Failed to neutralize text: {str(e)}")
+
+ def getExistingPlaceholder(self, original_text: str) -> Optional[DataNeutralizerAttributes]:
+ """Get existing placeholder for original text if it exists"""
+ try:
+ existing_attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
+ "mandateId": self.mandateId,
+ "userId": self.userId,
+ "originalText": original_text
+ })
+
+ if existing_attributes:
+ return DataNeutralizerAttributes.from_dict(existing_attributes[0])
+ return None
+
+ except Exception as e:
+ logger.error(f"Error getting existing placeholder: {str(e)}")
+ return None
+
+ def getNeutralizationAttributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]:
+ """Get neutralization attributes, optionally filtered by file ID"""
+ try:
+ filter_dict = {"mandateId": self.mandateId}
+ if file_id:
+ filter_dict["fileId"] = file_id
+
+ attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter=filter_dict)
+ filtered_attributes = self._uam(DataNeutralizerAttributes, attributes)
+
+ return [DataNeutralizerAttributes.from_dict(attr) for attr in filtered_attributes]
+
+ except Exception as e:
+ logger.error(f"Error getting neutralization attributes: {str(e)}")
+ return []
+
+ def resolveNeutralizedText(self, text: str) -> str:
+ """Resolve UIDs in neutralized text back to original text"""
+ try:
+ # Find all placeholders in the new format [type.uuid]
+ placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
+ matches = re.findall(placeholder_pattern, text)
+
+ resolved_text = text
+ for placeholder_type, uid in matches:
+ # Find the attribute with this UID (which is the record ID)
+ attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
+ "mandateId": self.mandateId,
+ "id": uid
+ })
+
+ if attributes:
+ attribute = attributes[0]
+ # Replace placeholder with original text
+ placeholder = f"[{placeholder_type}.{uid}]"
+ resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
+ else:
+ logger.warning(f"No attribute found for UID {uid}")
+
+ return resolved_text
+
+ except Exception as e:
+ logger.error(f"Error resolving neutralized text: {str(e)}")
+ return text
+
+ def deleteNeutralizationAttributes(self, file_id: str) -> bool:
+ """Delete all neutralization attributes for a specific file"""
+ try:
+ attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
+ "mandateId": self.mandateId,
+ "fileId": file_id
+ })
+
+ for attribute in attributes:
+ self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
+
+ logger.info(f"Deleted {len(attributes)} neutralization attributes for file {file_id}")
+ return True
+
+ except Exception as e:
+ logger.error(f"Error deleting neutralization attributes: {str(e)}")
+ return False
+
# Public Methods
def getInterface(currentUser: User) -> AppObjects:
diff --git a/modules/neutralizer/neutralizer.py b/modules/neutralizer/neutralizer.py
index 6d722f29..87427611 100644
--- a/modules/neutralizer/neutralizer.py
+++ b/modules/neutralizer/neutralizer.py
@@ -50,10 +50,15 @@ class ProcessResult:
class DataAnonymizer:
"""Hauptklasse für die Datenanonymisierung"""
- def __init__(self):
- """Initialize the anonymizer with patterns"""
+ def __init__(self, names_to_parse: List[str] = None):
+ """Initialize the anonymizer with patterns and custom names
+
+ Args:
+ names_to_parse: List of names to parse and replace (case-insensitive)
+ """
self.header_patterns = HeaderPatterns.patterns
self.data_patterns = DataPatterns.patterns
+ self.names_to_parse = names_to_parse or []
self.replaced_fields = set()
self.mapping = {}
self.processing_info = []
@@ -63,6 +68,7 @@ class DataAnonymizer:
text = re.sub(r'\s+', ' ', text)
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
+
def _is_table_line(self, line: str) -> bool:
"""Check if a line represents a table row"""
@@ -110,46 +116,72 @@ class DataAnonymizer:
except Exception as e:
logger.error(f"Error anonymizing table: {str(e)}")
- logger.debug(traceback.format_exc())
raise
def _anonymize_plain_text(self, text: PlainText) -> PlainText:
- """Anonymize plain text content"""
+ """Anonymize plain text content using simple search-and-replace approach"""
try:
- # Process the entire text at once instead of line by line
current_text = text.content
- # Find all matches in the entire text
- matches = find_patterns_in_text(current_text, self.data_patterns)
-
- # Process matches in reverse order to avoid position shifting
- for match in sorted(matches, key=lambda x: x[2], reverse=True):
- pattern_name, matched_text, start, end = match
+ # Step 1: Replace custom names first (simple regex search-and-replace)
+ for name in self.names_to_parse:
+ if not name.strip():
+ continue
+
+ # Create case-insensitive regex pattern with word boundaries
+ pattern = re.compile(r'\b' + re.escape(name.strip()) + r'\b', re.IGNORECASE)
- # Skip if the matched text is already a placeholder
- if re.match(r'\[[A-Z_]+\d+\]', matched_text):
+ # Find all matches for this name
+ matches = list(pattern.finditer(current_text))
+
+ # Replace each match with a placeholder
+ for match in reversed(matches): # Process from right to left to avoid position shifts
+ matched_text = match.group()
+ if matched_text not in self.mapping:
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ self.mapping[matched_text] = f"[name.{placeholder_id}]"
+
+ replacement = self.mapping[matched_text]
+ start, end = match.span()
+ current_text = current_text[:start] + replacement + current_text[end:]
+
+ # Step 2: Replace pattern-based matches (emails, phones, etc.)
+ # Use the same simple approach for patterns
+ pattern_matches = find_patterns_in_text(current_text, self.data_patterns)
+
+ # Process pattern matches from right to left to avoid position shifts
+ for pattern_name, matched_text, start, end in reversed(pattern_matches):
+ # Skip if already a placeholder
+ if re.match(r'\[[a-z]+\.[a-f0-9-]+\]', matched_text):
continue
- # Find the pattern that matched
- pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
- if pattern:
- # Use the pattern's replacement template
- if matched_text not in self.mapping:
- self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1)
- replacement = self.mapping[matched_text]
-
- if pattern_name == 'email':
- print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'")
- print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}")
+ # Skip if contains placeholder characters
+ if '[' in matched_text or ']' in matched_text:
+ continue
- # Replace the matched text while preserving surrounding whitespace
+ if matched_text not in self.mapping:
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern_name, 'data')
+ self.mapping[matched_text] = f"[{placeholder_type}.{placeholder_id}]"
+
+ replacement = self.mapping[matched_text]
current_text = current_text[:start] + replacement + current_text[end:]
return PlainText(content=current_text, source_type=text.source_type)
except Exception as e:
logger.error(f"Error anonymizing plain text: {str(e)}")
- logger.debug(traceback.format_exc())
raise
def _anonymize_json_value(self, value: Any, key: str = None) -> Any:
@@ -173,16 +205,49 @@ class DataAnonymizer:
pattern = get_pattern_for_header(key, self.header_patterns)
if pattern:
if value not in self.mapping:
- self.mapping[value] = pattern.replacement_template.format(len(self.mapping) + 1)
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'name': 'name',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern.name, 'data')
+ self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
return self.mapping[value]
# Check if the value itself matches any patterns
- matches = find_patterns_in_text(value, self.data_patterns)
- if matches:
- # Use the first match's pattern
- pattern_name = matches[0][0]
- if value not in self.mapping:
- self.mapping[value] = f"{pattern_name.upper()}_{len(self.mapping) + 1}"
+ pattern_matches = find_patterns_in_text(value, self.data_patterns)
+ custom_name_matches = self._find_custom_names(value)
+
+ if pattern_matches or custom_name_matches:
+ # Use the first match's pattern or custom name
+ if pattern_matches:
+ pattern_name = pattern_matches[0][0]
+ if value not in self.mapping:
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'name': 'name',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern_name, 'data')
+ self.mapping[value] = f"[{placeholder_type}.{placeholder_id}]"
+ elif custom_name_matches:
+ if value not in self.mapping:
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ self.mapping[value] = f"[name.{placeholder_id}]"
return self.mapping[value]
return value
@@ -207,7 +272,19 @@ class DataAnonymizer:
pattern = get_pattern_for_header(attr_name, self.header_patterns)
if pattern:
if attr_value not in self.mapping:
- self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1)
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'name': 'name',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern.name, 'data')
+ self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.mapping[attr_value]
else:
# Check if attribute value matches any data patterns
@@ -217,7 +294,19 @@ class DataAnonymizer:
pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
if pattern:
if attr_value not in self.mapping:
- self.mapping[attr_value] = pattern.replacement_template.format(len(self.mapping) + 1)
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'name': 'name',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern_name, 'data')
+ self.mapping[attr_value] = f"[{placeholder_type}.{placeholder_id}]"
processed_attrs[attr_name] = self.mapping[attr_value]
else:
processed_attrs[attr_name] = attr_value
@@ -230,14 +319,36 @@ class DataAnonymizer:
# Process text content
text = element.text.strip() if element.text and element.text.strip() else ''
if text:
- # Check if text matches any patterns
- matches = find_patterns_in_text(text, self.data_patterns)
- if matches:
- pattern_name = matches[0][0]
- pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
- if pattern:
+ # Check if text matches any patterns or custom names
+ pattern_matches = find_patterns_in_text(text, self.data_patterns)
+ custom_name_matches = self._find_custom_names(text)
+
+ if pattern_matches or custom_name_matches:
+ if pattern_matches:
+ pattern_name = pattern_matches[0][0]
+ pattern = next((p for p in self.data_patterns if p.name == pattern_name), None)
+ if pattern:
+ if text not in self.mapping:
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ # Create placeholder in format [type.uuid]
+ type_mapping = {
+ 'email': 'email',
+ 'phone': 'phone',
+ 'name': 'name',
+ 'address': 'address',
+ 'id': 'id'
+ }
+ placeholder_type = type_mapping.get(pattern_name, 'data')
+ self.mapping[text] = f"[{placeholder_type}.{placeholder_id}]"
+ text = self.mapping[text]
+ elif custom_name_matches:
if text not in self.mapping:
- self.mapping[text] = pattern.replacement_template.format(len(self.mapping) + 1)
+ # Generate a UUID for the placeholder
+ import uuid
+ placeholder_id = str(uuid.uuid4())
+ self.mapping[text] = f"[name.{placeholder_id}]"
text = self.mapping[text]
# Process child elements
@@ -271,18 +382,24 @@ class DataAnonymizer:
ProcessResult: Contains anonymized data, mapping, replaced fields and processing info
"""
try:
+
# Check if content is binary data
is_binary = False
try:
- # Try to decode base64 if it's a string
- try:
- decoded = base64.b64decode(content)
- # If it's not valid text, consider it binary
- decoded.decode('utf-8')
- except (base64.binascii.Error, UnicodeDecodeError):
- is_binary = True
- except Exception:
- is_binary = True
+ # First, check if content looks like base64 (contains only base64 characters)
+ if re.match(r'^[A-Za-z0-9+/]*={0,2}$', content.strip()):
+ # Try to decode base64 if it looks like base64
+ try:
+ decoded = base64.b64decode(content)
+ # If it's not valid text, consider it binary
+ decoded.decode('utf-8')
+ is_binary = True
+ except (base64.binascii.Error, UnicodeDecodeError):
+ is_binary = False
+ else:
+ is_binary = False
+ except Exception as e:
+ is_binary = False
if is_binary:
# TODO: Implement binary data neutralization
@@ -356,7 +473,7 @@ class DataAnonymizer:
# Combine all processed content
result = content
- for text, anonymized_text in zip(plain_texts, anonymized_texts):
+ for i, (text, anonymized_text) in enumerate(zip(plain_texts, anonymized_texts)):
if text.content != anonymized_text.content:
result = result.replace(text.content, anonymized_text.content)
@@ -364,5 +481,4 @@ class DataAnonymizer:
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
- logger.debug(traceback.format_exc())
return ProcessResult(None, self.mapping, [], {'type': 'error', 'error': str(e)})
\ No newline at end of file
diff --git a/modules/neutralizer/patterns.py b/modules/neutralizer/patterns.py
index 4cfbed93..175a690f 100644
--- a/modules/neutralizer/patterns.py
+++ b/modules/neutralizer/patterns.py
@@ -232,16 +232,6 @@ class HeaderPatterns:
class DataPatterns:
"""Patterns for identifying sensitive data in content"""
patterns = [
- # Name patterns
- Pattern(
- name="name",
- patterns=[
- # Person names with titles and academic degrees
- r'\b(?:Dr\.|Prof\.|PhD\.?|MD\.?|Herr|Frau|Mr\.|Mrs\.|Ms\.|Monsieur|Madame|Signore|Signora)\s+[A-Z][a-z]{2,}(?:\s+[A-Za-z]{2,}){1,2}\b'
- ],
- replacement_template="[NAME_{}]"
- ),
-
# Email pattern for plain text
Pattern(
name="email",
@@ -392,11 +382,6 @@ def find_patterns_in_text(text: str, patterns: List[Pattern]) -> List[tuple]:
matches = []
for pattern in patterns:
for p in pattern.patterns:
- if pattern.name == 'email':
- print(f"\nDEBUG: Checking email pattern '{p}'")
for match in re.finditer(p, text, re.IGNORECASE):
- if pattern.name == 'email':
- print(f"DEBUG: Found email match: '{match.group(0)}' at position {match.start()}-{match.end()}")
- print(f"DEBUG: Context: '{text[max(0, match.start()-20):match.end()+20]}'")
matches.append((pattern.name, match.group(0), match.start(), match.end()))
return sorted(matches, key=lambda x: x[2]) # Sort by start position
\ No newline at end of file
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index 2be59f21..3243bc21 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -17,7 +17,8 @@ from modules.security.auth import limiter, getCurrentUser
import modules.interfaces.interfaceComponentObjects as interfaceComponentObjects
from modules.interfaces.interfaceComponentModel import FileItem, FilePreview
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse, AttributeDefinition
-from modules.interfaces.interfaceAppModel import User
+from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
+from modules.services.serviceNeutralization import NeutralizationService
# Configure logger
logger = logging.getLogger(__name__)
@@ -364,3 +365,253 @@ async def preview_file(
detail=f"Error previewing file: {str(e)}"
)
+# Data Neutralization endpoints
+
+@router.get("/neutralization/config", response_model=DataNeutraliserConfig)
+@limiter.limit("30/minute")
+async def get_neutralization_config(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser)
+) -> DataNeutraliserConfig:
+ """Get data neutralization configuration"""
+ try:
+ service = NeutralizationService(currentUser)
+ config = service.get_config()
+
+ if not config:
+ # Return default config instead of 404
+ return DataNeutraliserConfig(
+ mandateId=currentUser.mandateId,
+ userId=currentUser.id,
+ enabled=True,
+ namesToParse="",
+ sharepointSourcePath="",
+ sharepointTargetPath=""
+ )
+
+ return config
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting neutralization config: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error getting neutralization config: {str(e)}"
+ )
+
+@router.post("/neutralization/config", response_model=DataNeutraliserConfig)
+@limiter.limit("10/minute")
+async def save_neutralization_config(
+ request: Request,
+ config_data: Dict[str, Any] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> DataNeutraliserConfig:
+ """Save or update data neutralization configuration"""
+ try:
+ service = NeutralizationService(currentUser)
+ config = service.save_config(config_data)
+
+ return config
+
+ except Exception as e:
+ logger.error(f"Error saving neutralization config: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error saving neutralization config: {str(e)}"
+ )
+
+@router.post("/neutralization/neutralize-text", response_model=Dict[str, Any])
+@limiter.limit("20/minute")
+async def neutralize_text(
+ request: Request,
+ text_data: Dict[str, Any] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """Neutralize text content"""
+ try:
+ text = text_data.get("text", "")
+ file_id = text_data.get("fileId")
+
+ if not text:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Text content is required"
+ )
+
+ service = NeutralizationService(currentUser)
+ result = service.neutralize_text(text, file_id)
+
+ return result
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error neutralizing text: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error neutralizing text: {str(e)}"
+ )
+
+@router.post("/neutralization/resolve-text", response_model=Dict[str, str])
+@limiter.limit("20/minute")
+async def resolve_text(
+ request: Request,
+ text_data: Dict[str, str] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, str]:
+ """Resolve UIDs in neutralized text back to original text"""
+ try:
+ text = text_data.get("text", "")
+
+ if not text:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Text content is required"
+ )
+
+ service = NeutralizationService(currentUser)
+ resolved_text = service.resolve_text(text)
+
+ return {"resolved_text": resolved_text}
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error resolving text: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error resolving text: {str(e)}"
+ )
+
+@router.get("/neutralization/attributes", response_model=List[DataNeutralizerAttributes])
+@limiter.limit("30/minute")
+async def get_neutralization_attributes(
+ request: Request,
+ fileId: Optional[str] = Query(None, description="Filter by file ID"),
+ currentUser: User = Depends(getCurrentUser)
+) -> List[DataNeutralizerAttributes]:
+ """Get neutralization attributes, optionally filtered by file ID"""
+ try:
+ service = NeutralizationService(currentUser)
+ attributes = service.get_attributes(fileId)
+
+ return attributes
+
+ except Exception as e:
+ logger.error(f"Error getting neutralization attributes: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error getting neutralization attributes: {str(e)}"
+ )
+
+@router.post("/neutralization/process-sharepoint", response_model=Dict[str, Any])
+@limiter.limit("5/minute")
+async def process_sharepoint_files(
+ request: Request,
+ paths_data: Dict[str, str] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """Process files from SharePoint source path and store neutralized files in target path"""
+ try:
+ source_path = paths_data.get("sourcePath", "")
+ target_path = paths_data.get("targetPath", "")
+
+ if not source_path or not target_path:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Both source and target paths are required"
+ )
+
+ service = NeutralizationService(currentUser)
+ result = await service.process_sharepoint_files(source_path, target_path)
+
+ return result
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error processing SharePoint files: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error processing SharePoint files: {str(e)}"
+ )
+
+@router.post("/neutralization/batch-process", response_model=Dict[str, Any])
+@limiter.limit("10/minute")
+async def batch_process_files(
+ request: Request,
+ files_data: List[Dict[str, Any]] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """Process multiple files for neutralization"""
+ try:
+ if not files_data:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Files data is required"
+ )
+
+ service = NeutralizationService(currentUser)
+ result = service.batch_neutralize_files(files_data)
+
+ return result
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error batch processing files: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error batch processing files: {str(e)}"
+ )
+
+@router.get("/neutralization/stats", response_model=Dict[str, Any])
+@limiter.limit("30/minute")
+async def get_neutralization_stats(
+ request: Request,
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """Get neutralization processing statistics"""
+ try:
+ service = NeutralizationService(currentUser)
+ stats = service.get_processing_stats()
+
+ return stats
+
+ except Exception as e:
+ logger.error(f"Error getting neutralization stats: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error getting neutralization stats: {str(e)}"
+ )
+
+@router.delete("/neutralization/attributes/{fileId}", response_model=Dict[str, str])
+@limiter.limit("10/minute")
+async def cleanup_file_attributes(
+ request: Request,
+ fileId: str = Path(..., description="File ID to cleanup attributes for"),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, str]:
+ """Clean up neutralization attributes for a specific file"""
+ try:
+ service = NeutralizationService(currentUser)
+ success = service.cleanup_file_attributes(fileId)
+
+ if success:
+ return {"message": f"Successfully cleaned up attributes for file {fileId}"}
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Failed to cleanup file attributes"
+ )
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error cleaning up file attributes: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error cleaning up file attributes: {str(e)}"
+ )
+
diff --git a/modules/services/serviceNeutralization.py b/modules/services/serviceNeutralization.py
new file mode 100644
index 00000000..e5c75a37
--- /dev/null
+++ b/modules/services/serviceNeutralization.py
@@ -0,0 +1,587 @@
+"""
+Data Neutralization Service
+Handles file processing for data neutralization including SharePoint integration
+"""
+
+import logging
+import os
+import uuid
+from typing import Dict, List, Any, Optional, Tuple
+from datetime import datetime
+from pathlib import Path
+import mimetypes
+
+from modules.interfaces.interfaceAppObjects import getInterface
+from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
+from modules.neutralizer.neutralizer import DataAnonymizer
+from modules.shared.timezoneUtils import get_utc_timestamp
+
+logger = logging.getLogger(__name__)
+
+class NeutralizationService:
+ """Service for handling data neutralization operations"""
+
+ def __init__(self, current_user: User):
+ """Initialize the service with user context"""
+ self.current_user = current_user
+ self.app_interface = getInterface(current_user)
+
+ def get_config(self) -> Optional[DataNeutraliserConfig]:
+ """Get the neutralization configuration for the current user's mandate"""
+ return self.app_interface.getNeutralizationConfig()
+
+ def save_config(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
+ """Save or update the neutralization configuration"""
+ return self.app_interface.createOrUpdateNeutralizationConfig(config_data)
+
+ def neutralize_text(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
+ """Neutralize text content and return results with attribute mappings"""
+ return self.app_interface.neutralizeText(text, file_id)
+
+ def get_attributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]:
+ """Get neutralization attributes, optionally filtered by file ID"""
+ return self.app_interface.getNeutralizationAttributes(file_id)
+
+ def resolve_text(self, text: str) -> str:
+ """Resolve UIDs in neutralized text back to original text"""
+ return self.app_interface.resolveNeutralizedText(text)
+
+ async def process_sharepoint_files(self, source_path: str, target_path: str) -> Dict[str, Any]:
+ """
+ Process files from SharePoint source path, neutralize them, and store in target path
+
+ Args:
+ source_path: SharePoint path to read files from
+ target_path: SharePoint path to store neutralized files
+
+ Returns:
+ Dictionary with processing results
+ """
+ try:
+ logger.info(f"Processing SharePoint files from {source_path} to {target_path}")
+
+ # Get user's SharePoint connection that matches the source path
+ sharepoint_connection = await self._get_sharepoint_connection(source_path)
+ if not sharepoint_connection:
+ return {
+ "success": False,
+ "message": "No SharePoint connection found for user",
+ "processed_files": 0,
+ "errors": ["No SharePoint connection found"]
+ }
+
+ logger.info(f"Using SharePoint connection: {sharepoint_connection.get('id')} for path: {source_path}")
+
+ # Get SharePoint access token
+ sharepoint_token = self.app_interface.getConnectionToken(sharepoint_connection["id"])
+ if not sharepoint_token:
+ return {
+ "success": False,
+ "message": "No SharePoint access token found",
+ "processed_files": 0,
+ "errors": ["No SharePoint access token found"]
+ }
+
+ # Process files asynchronously
+ return await self._process_sharepoint_files_async(
+ source_path, target_path, sharepoint_token.tokenAccess
+ )
+
+ except Exception as e:
+ logger.error(f"Error processing SharePoint files: {str(e)}")
+ return {
+ "success": False,
+ "message": f"Error processing SharePoint files: {str(e)}",
+ "processed_files": 0,
+ "errors": [str(e)]
+ }
+
+ async def _get_sharepoint_connection(self, sharepoint_path: str = None):
+ """Get user's SharePoint connection that matches the given path"""
+ try:
+ # Get all user connections
+ from modules.interfaces.interfaceAppModel import UserConnection
+ connections = self.app_interface.db.getRecordset(
+ UserConnection,
+ recordFilter={"userId": self.app_interface.userId}
+ )
+
+ # Find all Microsoft connections
+ msft_connections = [conn for conn in connections if conn.get("authority") == "msft"]
+
+ if not msft_connections:
+ logger.warning("No Microsoft connections found for user")
+ return None
+
+ if len(msft_connections) == 1:
+ logger.info(f"Found single Microsoft connection: {msft_connections[0].get('id')}")
+ return msft_connections[0]
+
+ # If multiple connections and we have a path, try to match
+ if sharepoint_path:
+ return await self._match_connection_to_path(msft_connections, sharepoint_path)
+
+ # If no path provided, return the first one
+ logger.info(f"Multiple Microsoft connections found, using first one: {msft_connections[0].get('id')}")
+ return msft_connections[0]
+
+ except Exception as e:
+ logger.error(f"Error getting SharePoint connection: {str(e)}")
+ return None
+
+ async def _match_connection_to_path(self, connections: list, sharepoint_path: str):
+ """Match a connection to the SharePoint path by testing access"""
+ try:
+ # Extract domain from the path
+ from urllib.parse import urlparse
+ parsed_url = urlparse(sharepoint_path)
+ target_domain = parsed_url.netloc.lower()
+
+ logger.info(f"Looking for connection matching domain: {target_domain}")
+
+ # Try each connection to see which one can access the site
+ for connection in connections:
+ try:
+ # Get token for this connection
+ token = self.app_interface.getConnectionToken(connection["id"])
+ if not token:
+ continue
+
+ # Test if this connection can access the SharePoint site
+ if await self._test_sharepoint_access(token.tokenAccess, sharepoint_path):
+ logger.info(f"Found matching connection for domain {target_domain}: {connection.get('id')}")
+ return connection
+
+ except Exception as e:
+ continue
+
+ # If no specific match found, return the first connection
+ logger.warning(f"No specific connection match found for {target_domain}, using first available")
+ return connections[0]
+
+ except Exception as e:
+ logger.error(f"Error matching connection to path: {str(e)}")
+ return connections[0] if connections else None
+
+ async def _test_sharepoint_access(self, access_token: str, sharepoint_path: str) -> bool:
+ """Test if the access token can access the given SharePoint path"""
+ try:
+ return await self._test_sharepoint_access_async(access_token, sharepoint_path)
+ except Exception as e:
+ return False
+
+ async def _test_sharepoint_access_async(self, access_token: str, sharepoint_path: str) -> bool:
+ """Async test for SharePoint access"""
+ try:
+ from modules.connectors.connectorSharepoint import ConnectorSharepoint
+
+ connector = ConnectorSharepoint(access_token=access_token)
+
+ # Parse the path to get site URL
+ site_url, _ = self._parse_sharepoint_path(sharepoint_path)
+ if not site_url:
+ return False
+
+ # Try to find the site
+ site_info = await connector.find_site_by_web_url(site_url)
+ return site_info is not None
+
+ except Exception as e:
+ return False
+
+ async def _process_sharepoint_files_async(self, source_path: str, target_path: str, access_token: str) -> Dict[str, Any]:
+ """Process SharePoint files asynchronously"""
+ try:
+ import asyncio
+ from modules.connectors.connectorSharepoint import ConnectorSharepoint
+
+ # Initialize SharePoint connector
+ connector = ConnectorSharepoint(access_token=access_token)
+
+ # Parse source and target paths to extract site and folder info
+ source_site, source_folder = self._parse_sharepoint_path(source_path)
+ target_site, target_folder = self._parse_sharepoint_path(target_path)
+
+ if not source_site or not target_site:
+ return {
+ "success": False,
+ "message": "Invalid SharePoint path format",
+ "processed_files": 0,
+ "errors": ["Invalid SharePoint path format"]
+ }
+
+ # Find source site
+ source_site_info = await connector.find_site_by_web_url(source_site)
+ if not source_site_info:
+ return {
+ "success": False,
+ "message": f"Source site not found: {source_site}",
+ "processed_files": 0,
+ "errors": [f"Source site not found: {source_site}"]
+ }
+
+ # Find target site
+ target_site_info = await connector.find_site_by_web_url(target_site)
+ if not target_site_info:
+ return {
+ "success": False,
+ "message": f"Target site not found: {target_site}",
+ "processed_files": 0,
+ "errors": [f"Target site not found: {target_site}"]
+ }
+
+ # List files in source folder
+ logger.info(f"Listing files in folder: {source_folder} for site: {source_site_info['id']}")
+ files = await connector.list_folder_contents(source_site_info["id"], source_folder)
+
+ # If no files found, try listing the root folder to see what's available
+ if not files:
+ logger.warning(f"No files found in folder '{source_folder}', trying root folder")
+ files = await connector.list_folder_contents(source_site_info["id"], "")
+
+ if files:
+ # List available folders for debugging
+ folders = [f for f in files if f.get("type") == "folder"]
+ folder_names = [f.get('name') for f in folders]
+ logger.info(f"Available folders in root: {folder_names}")
+
+ # Format folder list for better UI display
+ folder_list = ", ".join(folder_names) if folder_names else "None"
+
+ return {
+ "success": False,
+ "message": f"Folder '{source_folder}' not found. Available folders in root: {folder_list}",
+ "processed_files": 0,
+ "errors": [f"Folder '{source_folder}' not found. Available folders: {folder_list}"],
+ "available_folders": folder_names
+ }
+ else:
+ return {
+ "success": False,
+ "message": f"No files found in source folder: {source_folder}",
+ "processed_files": 0,
+ "errors": [f"No files found in source folder: {source_folder}"]
+ }
+
+ # Filter for text files only
+ text_files = [f for f in files if f.get("type") == "file" and self._is_text_file(f.get("name", ""))]
+
+ if not text_files:
+ return {
+ "success": False,
+ "message": "No text files found in source folder",
+ "processed_files": 0,
+ "errors": ["No text files found in source folder"]
+ }
+
+ # Process files in parallel for better performance
+ processed_files = []
+ errors = []
+
+ # Create tasks for parallel processing
+ async def process_single_file(file_info):
+ """Process a single file - download, neutralize, upload"""
+ try:
+ # Download file
+ file_content = await connector.download_file(source_site_info["id"], file_info["id"])
+ if not file_content:
+ return {"error": f"Failed to download file: {file_info['name']}"}
+
+ # Convert to text
+ try:
+ text_content = file_content.decode('utf-8')
+ except UnicodeDecodeError:
+ text_content = file_content.decode('latin-1')
+
+ # Neutralize the text
+ neutralization_result = self.app_interface.neutralizeText(text_content, file_info["id"])
+
+ # Create neutralized filename
+ neutralized_filename = f"neutralized_{file_info['name']}"
+
+ # Upload neutralized file
+ neutralized_content = neutralization_result["neutralized_text"].encode('utf-8')
+ upload_result = await connector.upload_file(
+ target_site_info["id"],
+ target_folder,
+ neutralized_filename,
+ neutralized_content
+ )
+
+ if "error" in upload_result:
+ return {"error": f"Failed to upload neutralized file: {neutralized_filename} - {upload_result['error']}"}
+ else:
+ return {
+ "success": True,
+ "original_name": file_info["name"],
+ "neutralized_name": neutralized_filename,
+ "attributes_count": len(neutralization_result.get("attributes", []))
+ }
+
+ except Exception as e:
+ error_msg = f"Error processing file {file_info['name']}: {str(e)}"
+ logger.error(error_msg)
+ return {"error": error_msg}
+
+ # Process all files in parallel
+ logger.info(f"Processing {len(text_files)} files in parallel...")
+ tasks = [process_single_file(file_info) for file_info in text_files]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Process results
+ for i, result in enumerate(results):
+ if isinstance(result, Exception):
+ error_msg = f"Exception processing file {text_files[i]['name']}: {str(result)}"
+ errors.append(error_msg)
+ logger.error(error_msg)
+ elif isinstance(result, dict) and "error" in result:
+ errors.append(result["error"])
+ elif isinstance(result, dict) and result.get("success"):
+ processed_files.append({
+ "original_name": result["original_name"],
+ "neutralized_name": result["neutralized_name"],
+ "attributes_count": result["attributes_count"]
+ })
+ logger.info(f"Successfully processed file: {result['original_name']} -> {result['neutralized_name']}")
+ else:
+ error_msg = f"Unknown result processing file {text_files[i]['name']}: {result}"
+ errors.append(error_msg)
+ logger.error(error_msg)
+
+ return {
+ "success": len(processed_files) > 0,
+ "message": f"Processed {len(processed_files)} files successfully",
+ "processed_files": len(processed_files),
+ "files": processed_files,
+ "errors": errors
+ }
+
+ except Exception as e:
+ logger.error(f"Error in async SharePoint processing: {str(e)}")
+ return {
+ "success": False,
+ "message": f"Error in async SharePoint processing: {str(e)}",
+ "processed_files": 0,
+ "errors": [str(e)]
+ }
+
+ def _parse_sharepoint_path(self, path: str) -> tuple[str, str]:
+ """Parse SharePoint path to extract site URL and folder path"""
+ try:
+ # Expected format: https://domain.sharepoint.com/sites/sitename/folder/path
+ if not path.startswith("https://"):
+ return None, None
+
+ # Remove query parameters
+ if "?" in path:
+ path = path.split("?")[0]
+
+ # Split by /sites/
+ if "/sites/" not in path:
+ return None, None
+
+ parts = path.split("/sites/", 1)
+ if len(parts) != 2:
+ return None, None
+
+ # Extract domain and site name
+ domain = parts[0].replace("https://", "")
+ site_name = parts[1].split("/")[0]
+
+ # Create proper site URL for Graph API
+ site_url = f"https://{domain}/sites/{site_name}"
+
+ # Extract folder path (everything after the site name)
+ folder_parts = parts[1].split("/")[1:]
+ folder_path = "/".join(folder_parts) if folder_parts else ""
+
+ # URL decode the folder path
+ from urllib.parse import unquote
+ folder_path = unquote(folder_path)
+
+
+ return site_url, folder_path
+
+ except Exception as e:
+ logger.error(f"Error parsing SharePoint path '{path}': {str(e)}")
+ return None, None
+
+ def _is_text_file(self, filename: str) -> bool:
+ """Check if file is a text file based on extension"""
+ text_extensions = [
+ '.txt', '.csv', '.json', '.xml', '.md', '.log',
+ '.doc', '.docx', '.rtf', '.odt', # Document formats
+ '.html', '.htm', '.css', '.js', '.ts', '.py', '.java', '.cpp', '.c', '.h', # Code files
+ '.ini', '.cfg', '.conf', '.properties', # Config files
+ '.sql', '.yaml', '.yml', '.toml', # Data/config files
+ '.ps1', '.bat', '.sh', '.bash' # Script files
+ ]
+ return any(filename.lower().endswith(ext) for ext in text_extensions)
+
+ def process_file_content(self, file_content: bytes, file_name: str, mime_type: str) -> Dict[str, Any]:
+ """
+ Process file content for neutralization
+
+ Args:
+ file_content: Binary file content
+ file_name: Name of the file
+ mime_type: MIME type of the file
+
+ Returns:
+ Dictionary with neutralization results
+ """
+ try:
+ # Determine content type based on MIME type
+ content_type = self._get_content_type_from_mime(mime_type)
+
+ # Decode content to text
+ try:
+ text_content = file_content.decode('utf-8')
+ except UnicodeDecodeError:
+ # Try with different encodings
+ for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
+ try:
+ text_content = file_content.decode(encoding)
+ break
+ except UnicodeDecodeError:
+ continue
+ else:
+ raise ValueError("Unable to decode file content")
+
+ # Generate a temporary file ID for tracking
+ temp_file_id = str(uuid.uuid4())
+
+ # Neutralize the content
+ neutralization_result = self.neutralize_text(text_content, temp_file_id)
+
+ # Encode the neutralized content back to bytes
+ neutralized_content = neutralization_result["neutralized_text"].encode('utf-8')
+
+ # Generate neutralized file name
+ neutralized_file_name = f"neutralized_{file_name}"
+
+ return {
+ "success": True,
+ "original_content": text_content,
+ "neutralized_content": neutralization_result["neutralized_text"],
+ "neutralized_file_name": neutralized_file_name,
+ "attributes": neutralization_result["attributes"],
+ "mapping": neutralization_result["mapping"],
+ "file_id": temp_file_id
+ }
+
+ except Exception as e:
+ logger.error(f"Error processing file content: {str(e)}")
+ return {
+ "success": False,
+ "error": str(e),
+ "original_content": None,
+ "neutralized_content": None
+ }
+
+ def _get_content_type_from_mime(self, mime_type: str) -> str:
+ """Determine content type from MIME type for neutralization processing"""
+ if mime_type.startswith('text/'):
+ return 'text'
+ elif mime_type in ['application/json', 'application/xml', 'text/xml']:
+ return 'json' if 'json' in mime_type else 'xml'
+ elif mime_type in ['text/csv', 'application/csv']:
+ return 'csv'
+ else:
+ return 'text' # Default to text processing
+
+ def batch_neutralize_files(self, files_data: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """
+ Process multiple files for neutralization
+
+ Args:
+ files_data: List of dictionaries containing file information
+ Each dict should have: content, name, mime_type
+
+ Returns:
+ Dictionary with batch processing results
+ """
+ try:
+ results = []
+ total_files = len(files_data)
+ successful_files = 0
+ errors = []
+
+ for file_data in files_data:
+ try:
+ result = self.process_file_content(
+ file_data['content'],
+ file_data['name'],
+ file_data['mime_type']
+ )
+
+ if result['success']:
+ successful_files += 1
+ results.append({
+ 'file_name': file_data['name'],
+ 'neutralized_file_name': result['neutralized_file_name'],
+ 'file_id': result['file_id'],
+ 'attributes_count': len(result['attributes'])
+ })
+ else:
+ errors.append(f"Failed to process {file_data['name']}: {result['error']}")
+
+ except Exception as e:
+ error_msg = f"Error processing {file_data['name']}: {str(e)}"
+ errors.append(error_msg)
+ logger.error(error_msg)
+
+ return {
+ "success": len(errors) == 0,
+ "total_files": total_files,
+ "successful_files": successful_files,
+ "failed_files": len(errors),
+ "results": results,
+ "errors": errors
+ }
+
+ except Exception as e:
+ logger.error(f"Error in batch neutralization: {str(e)}")
+ return {
+ "success": False,
+ "total_files": len(files_data),
+ "successful_files": 0,
+ "failed_files": len(files_data),
+ "results": [],
+ "errors": [str(e)]
+ }
+
+ def cleanup_file_attributes(self, file_id: str) -> bool:
+ """Clean up neutralization attributes for a specific file"""
+ return self.app_interface.deleteNeutralizationAttributes(file_id)
+
+ def get_processing_stats(self) -> Dict[str, Any]:
+ """Get statistics about neutralization processing"""
+ try:
+ # Get all attributes for the current mandate
+ all_attributes = self.get_attributes()
+
+ # Group by pattern type
+ pattern_counts = {}
+ for attr in all_attributes:
+ pattern_type = attr.patternType
+ pattern_counts[pattern_type] = pattern_counts.get(pattern_type, 0) + 1
+
+ # Get unique files
+ unique_files = set(attr.fileId for attr in all_attributes if attr.fileId)
+
+ return {
+ "total_attributes": len(all_attributes),
+ "unique_files": len(unique_files),
+ "pattern_counts": pattern_counts,
+ "mandate_id": self.current_user.mandateId
+ }
+
+ except Exception as e:
+ logger.error(f"Error getting processing stats: {str(e)}")
+ return {
+ "total_attributes": 0,
+ "unique_files": 0,
+ "pattern_counts": {},
+ "error": str(e)
+ }
diff --git a/tests/test_neutralizer/neutralizer.py b/tests/test_neutralizer/neutralizer.py
index 9646d632..0691cd88 100644
--- a/tests/test_neutralizer/neutralizer.py
+++ b/tests/test_neutralizer/neutralizer.py
@@ -129,11 +129,7 @@ class DataAnonymizer:
if matched_text not in self.mapping:
self.mapping[matched_text] = pattern.replacement_template.format(len(self.mapping) + 1)
replacement = self.mapping[matched_text]
-
- if pattern_name == 'email':
- print(f"DEBUG: Replacing email '{matched_text}' with '{replacement}'")
- print(f"DEBUG: Text after replacement: {current_text[:start] + replacement + current_text[end:]}")
-
+
# Replace the matched text while preserving surrounding whitespace
current_text = current_text[:start] + replacement + current_text[end:]
diff --git a/tests/test_neutralizer/patterns.py b/tests/test_neutralizer/patterns.py
index 4cfbed93..e5ae46e3 100644
--- a/tests/test_neutralizer/patterns.py
+++ b/tests/test_neutralizer/patterns.py
@@ -392,11 +392,5 @@ def find_patterns_in_text(text: str, patterns: List[Pattern]) -> List[tuple]:
matches = []
for pattern in patterns:
for p in pattern.patterns:
- if pattern.name == 'email':
- print(f"\nDEBUG: Checking email pattern '{p}'")
- for match in re.finditer(p, text, re.IGNORECASE):
- if pattern.name == 'email':
- print(f"DEBUG: Found email match: '{match.group(0)}' at position {match.start()}-{match.end()}")
- print(f"DEBUG: Context: '{text[max(0, match.start()-20):match.end()+20]}'")
matches.append((pattern.name, match.group(0), match.start(), match.end()))
return sorted(matches, key=lambda x: x[2]) # Sort by start position
\ No newline at end of file