573 lines
27 KiB
Python
573 lines
27 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Data Neutralization Service
|
|
Handles file processing for data neutralization including SharePoint integration
|
|
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
|
|
Supports TXT, JSON, CSV, PDF, DOCX, XLSX, PPTX (extract -> neutralize -> generate)
|
|
Mehrsprachig: DE, EN, FR, IT
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import json
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
|
|
from modules.features.neutralization.interfaceFeatureNeutralizer import InterfaceFeatureNeutralizer, getInterface as getNeutralizerInterface
|
|
|
|
# Import all necessary classes and functions for neutralization
|
|
from .subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute
|
|
from .subProcessText import TextProcessor, PlainText
|
|
from .subProcessList import ListProcessor, TableData
|
|
from .subProcessBinary import BinaryProcessor
|
|
from .subProcessPdfInPlace import neutralize_pdf_in_place
|
|
from .subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns
|
|
from .subContentPartAdapter import content_parts_to_renderer_schema
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# MIME types that can be processed via extract -> neutralize -> generate
|
|
EXTRACTABLE_BINARY_MIME_TYPES = frozenset({
|
|
"application/pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
})
|
|
|
|
class NeutralizationService:
|
|
"""Service for handling data neutralization operations"""
|
|
|
|
def __init__(self, serviceCenter=None, NamesToParse: List[str] = None):
|
|
"""Initialize the service with user context and anonymization processors
|
|
|
|
Args:
|
|
serviceCenter: Service center instance for accessing other services
|
|
NamesToParse: List of names to parse and replace (case-insensitive)
|
|
"""
|
|
self.services = serviceCenter
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
|
|
# Create feature-specific interface for neutralizer DB operations
|
|
self.interfaceNeutralizer: InterfaceFeatureNeutralizer = None
|
|
if serviceCenter and serviceCenter.interfaceDbApp:
|
|
dbApp = serviceCenter.interfaceDbApp
|
|
self.interfaceNeutralizer = getNeutralizerInterface(
|
|
currentUser=dbApp.currentUser,
|
|
mandateId=serviceCenter.mandateId or dbApp.mandateId,
|
|
featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None) or getattr(dbApp, 'featureInstanceId', None)
|
|
)
|
|
|
|
# Initialize anonymization processors
|
|
self.NamesToParse = NamesToParse or []
|
|
self.textProcessor = TextProcessor(NamesToParse)
|
|
self.listProcessor = ListProcessor(NamesToParse)
|
|
self.binaryProcessor = BinaryProcessor()
|
|
self.commonUtils = CommonUtils()
|
|
|
|
def getConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get the neutralization configuration for the current user's mandate"""
|
|
if not self.interfaceNeutralizer:
|
|
return None
|
|
return self.interfaceNeutralizer.getNeutralizationConfig()
|
|
|
|
def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig:
|
|
"""Save or update the neutralization configuration"""
|
|
if not self.interfaceNeutralizer:
|
|
raise ValueError("User context required for saving configuration")
|
|
return self.interfaceNeutralizer.createOrUpdateNeutralizationConfig(configData)
|
|
|
|
# Public API: process text or file
|
|
|
|
def processText(self, text: str) -> Dict[str, Any]:
|
|
"""Neutralize a raw text string and return a standard result dict."""
|
|
result = self._neutralizeText(text, 'text')
|
|
self._persistAttributes(result.get('mapping', {}), None)
|
|
return result
|
|
|
|
def processFile(self, fileId: str) -> Dict[str, Any]:
|
|
"""Neutralize a file referenced by its fileId using component interface.
|
|
Supports text files directly; PDF/DOCX/XLSX/PPTX via extract -> neutralize -> generate."""
|
|
if not self.interfaceDbComponent:
|
|
raise ValueError("Component interface is required to process a file by fileId")
|
|
fileInfo = None
|
|
try:
|
|
fileInfo = self.interfaceDbComponent.getFile(fileId)
|
|
except Exception:
|
|
fileInfo = None
|
|
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
|
|
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
|
|
|
|
fileData = self.interfaceDbComponent.getFileData(fileId)
|
|
if not fileData:
|
|
raise ValueError(f"No file data found for fileId: {fileId}")
|
|
|
|
mime_lower = (mimeType or '').lower()
|
|
|
|
# Binary but extractable: PDF, DOCX, XLSX, PPTX
|
|
if mime_lower in EXTRACTABLE_BINARY_MIME_TYPES:
|
|
try:
|
|
result = asyncio.run(self._processBinaryFile(fileData, fileName or "document", mime_lower, fileId))
|
|
if result:
|
|
result['file_id'] = fileId
|
|
result['neutralized_file_name'] = f"neutralized_{fileName}" if fileName else "neutralized_document"
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Binary file neutralization failed: {str(e)}")
|
|
return {
|
|
'file_id': fileId,
|
|
'is_binary': True,
|
|
'mime_type': mimeType or 'unknown',
|
|
'file_name': fileName or 'unknown',
|
|
'neutralized_text': None,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
|
|
}
|
|
|
|
# Binary but not extractable
|
|
if self._isBinaryMimeType(mimeType or ''):
|
|
return {
|
|
'file_id': fileId,
|
|
'is_binary': True,
|
|
'mime_type': mimeType or 'unknown',
|
|
'file_name': fileName or 'unknown',
|
|
'neutralized_text': None,
|
|
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported for neutralization'}
|
|
}
|
|
|
|
# Text-based file
|
|
textType = self._getContentTypeFromMime(mimeType or '')
|
|
try:
|
|
textContent = fileData.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
decoded = None
|
|
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
decoded = fileData.decode(enc)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
if decoded is None:
|
|
raise ValueError("Unable to decode file content as text.")
|
|
textContent = decoded
|
|
|
|
result = self._neutralizeText(textContent, textType)
|
|
self._persistAttributes(result.get('mapping', {}), fileId)
|
|
if fileName:
|
|
result['neutralized_file_name'] = f"neutralized_{fileName}"
|
|
result['file_id'] = fileId
|
|
result['is_binary'] = False
|
|
return result
|
|
|
|
def processBinaryBytes(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]:
|
|
"""Neutralize binary file bytes (sync - use from sync callers). Uses asyncio.run when event loop not running."""
|
|
mime_lower = (mimeType or '').lower()
|
|
if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES:
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'}
|
|
}
|
|
try:
|
|
return asyncio.run(self._processBinaryFile(fileBytes, fileName, mime_lower, None))
|
|
except Exception as e:
|
|
logger.error(f"Binary neutralization failed: {str(e)}")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
|
|
}
|
|
|
|
async def processBinaryBytesAsync(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]:
|
|
"""Neutralize binary file bytes (async - use from async routes to avoid event loop conflict)."""
|
|
mime_lower = (mimeType or '').lower()
|
|
if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES:
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'}
|
|
}
|
|
try:
|
|
return await self._processBinaryFile(fileBytes, fileName, mime_lower, None)
|
|
except Exception as e:
|
|
logger.error(f"Binary neutralization failed: {str(e)}")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
|
|
}
|
|
|
|
def resolveText(self, text: str) -> str:
|
|
if not self.interfaceNeutralizer:
|
|
return text
|
|
try:
|
|
placeholderPattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
|
|
matches = re.findall(placeholderPattern, text)
|
|
resolvedText = text
|
|
for placeholderType, uid in matches:
|
|
attribute = self.interfaceNeutralizer.getAttributeById(uid)
|
|
if attribute:
|
|
placeholder = f"[{placeholderType}.{uid}]"
|
|
resolvedText = resolvedText.replace(placeholder, attribute["originalText"])
|
|
return resolvedText
|
|
except Exception:
|
|
return text
|
|
|
|
def getAttributes(self) -> List[DataNeutralizerAttributes]:
|
|
"""Get all neutralization attributes for the current user's mandate"""
|
|
if not self.interfaceNeutralizer:
|
|
return []
|
|
try:
|
|
# Use the interface method which properly converts dicts to objects
|
|
return self.interfaceNeutralizer.getNeutralizationAttributes()
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization attributes: {str(e)}")
|
|
return []
|
|
|
|
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
|
|
"""Delete neutralization attributes for a specific file"""
|
|
if not self.interfaceNeutralizer:
|
|
return False
|
|
return self.interfaceNeutralizer.deleteNeutralizationAttributes(fileId)
|
|
|
|
def _persistAttributes(self, mapping: Dict[str, str], fileId: Optional[str]) -> None:
|
|
"""Persist mapping to DB for resolve to work. mapping: originalText -> placeholder e.g. '[email.uuid]'"""
|
|
if not self.interfaceNeutralizer or not mapping:
|
|
return
|
|
import re
|
|
placeholder_re = re.compile(r'^\[([a-z]+)\.([a-f0-9-]{36})\]$')
|
|
for original_text, placeholder in mapping.items():
|
|
m = placeholder_re.match(placeholder)
|
|
if m:
|
|
pattern_type, uid = m.group(1), m.group(2)
|
|
try:
|
|
self.interfaceNeutralizer.createAttribute(
|
|
attributeId=uid,
|
|
originalText=original_text,
|
|
patternType=pattern_type,
|
|
fileId=fileId
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Could not persist attribute {uid}: {e}")
|
|
|
|
async def _processBinaryFile(
|
|
self,
|
|
fileBytes: bytes,
|
|
fileName: str,
|
|
mimeType: str,
|
|
fileId: Optional[str]
|
|
) -> Dict[str, Any]:
|
|
"""Extract -> neutralize -> adapt -> generate for PDF/DOCX/XLSX/PPTX."""
|
|
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
|
|
from modules.services.serviceExtraction.subPipeline import runExtraction
|
|
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
|
|
|
|
# Ensure registries exist
|
|
if ExtractionService._sharedExtractorRegistry is None:
|
|
ExtractionService(self.services)
|
|
registry = ExtractionService._sharedExtractorRegistry
|
|
chunker = ExtractionService._sharedChunkerRegistry
|
|
opts = ExtractionOptions(prompt="neutralize", mergeStrategy=MergeStrategy(preserveChunks=True))
|
|
|
|
# 1. Extract
|
|
extracted = runExtraction(registry, chunker, fileBytes, fileName, mimeType, opts)
|
|
parts = extracted.parts if hasattr(extracted, 'parts') else []
|
|
|
|
if not parts:
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'No content extracted'}
|
|
}
|
|
|
|
# 2. Neutralize each text/table part
|
|
all_mapping: Dict[str, str] = {}
|
|
neutralized_parts: List[Any] = []
|
|
neutralization_error: Optional[str] = None
|
|
for part in parts:
|
|
p = part if isinstance(part, dict) else part.model_dump() if hasattr(part, 'model_dump') else part
|
|
type_group = p.get('typeGroup', '')
|
|
data = p.get('data', '')
|
|
if type_group in ('binary', 'image') or not (data and str(data).strip()):
|
|
neutralized_parts.append(part)
|
|
continue
|
|
nr = self._neutralizeText(str(data), 'text' if type_group != 'table' else 'csv')
|
|
proc = nr.get('processed_info', {}) or {}
|
|
if isinstance(proc, dict) and proc.get('type') == 'error':
|
|
neutralization_error = proc.get('error', 'Neutralization failed')
|
|
neu_text = nr.get('neutralized_text', str(data))
|
|
mapping = nr.get('mapping', {})
|
|
all_mapping.update(mapping)
|
|
new_part = {**p, 'data': neu_text}
|
|
neutralized_parts.append(new_part)
|
|
self._persistAttributes(all_mapping, fileId)
|
|
|
|
# 3. PDF: Use in-place only; no fallback to render
|
|
if mimeType == "application/pdf":
|
|
if neutralization_error:
|
|
logger.error(f"PDF neutralization aborted: {neutralization_error}")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': neutralization_error}
|
|
}
|
|
in_place_bytes = neutralize_pdf_in_place(fileBytes, all_mapping)
|
|
if in_place_bytes is not None:
|
|
logger.info("PDF neutralization completed via in-place redaction (layout preserved)")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': in_place_bytes,
|
|
'neutralized_file_name': f"neutralized_{fileName}",
|
|
'is_binary': True,
|
|
'mime_type': 'application/pdf',
|
|
'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()],
|
|
'processed_info': {'type': 'binary', 'status': 'success', 'format': 'pdf', 'method': 'in-place'}
|
|
}
|
|
logger.error("PDF in-place neutralization failed")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'PDF in-place neutralization failed'}
|
|
}
|
|
|
|
# 4. Adapter: ContentPart list -> renderer schema (non-PDF only)
|
|
schema = content_parts_to_renderer_schema(neutralized_parts, title=fileName or "Neutralized")
|
|
|
|
# 5. Render to format
|
|
renderer, output_mime = self._getRendererForMime(mimeType)
|
|
if not renderer:
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': f'No renderer for {mimeType}'}
|
|
}
|
|
|
|
try:
|
|
logger.info(f"Calling renderer.render for mime={mimeType}, renderer={type(renderer).__name__}")
|
|
rendered = await renderer.render(schema, fileName or "document", None, None)
|
|
logger.info(f"Renderer returned: type={type(rendered).__name__}, len={len(rendered) if rendered else 0}")
|
|
if not rendered or len(rendered) == 0:
|
|
logger.error("Renderer returned empty list")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'}
|
|
}
|
|
doc = rendered[0]
|
|
logger.info(f"First doc: type={type(doc).__name__}, isinstance(dict)={isinstance(doc, dict)}, has documentData attr={hasattr(doc, 'documentData')}")
|
|
# Extract documentData: Pydantic v2 models may need model_dump() for reliable access
|
|
if isinstance(doc, dict):
|
|
doc_data = doc.get('documentData')
|
|
elif hasattr(doc, 'model_dump'):
|
|
d = doc.model_dump(mode='python')
|
|
doc_data = d.get('documentData')
|
|
else:
|
|
doc_data = getattr(doc, 'documentData', None)
|
|
logger.info(f"doc_data: type={type(doc_data).__name__ if doc_data is not None else 'None'}, len={len(doc_data) if doc_data else 0}")
|
|
if doc_data is None:
|
|
logger.error("Renderer returned document with no documentData")
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Renderer returned no data'}
|
|
}
|
|
if isinstance(doc_data, str):
|
|
doc_data = doc_data.encode('utf-8')
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': doc_data,
|
|
'neutralized_file_name': f"neutralized_{fileName}",
|
|
'is_binary': True,
|
|
'mime_type': output_mime,
|
|
'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()],
|
|
'processed_info': {'type': 'binary', 'status': 'success', 'format': mimeType}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Render failed for {mimeType}: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
return {
|
|
'neutralized_text': None,
|
|
'neutralized_bytes': None,
|
|
'is_binary': True,
|
|
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'}
|
|
}
|
|
|
|
def _getRendererForMime(self, mimeType: str):
|
|
"""Get renderer instance and output mime for the given input MIME type."""
|
|
from modules.services.serviceGeneration.renderers.rendererPdf import RendererPdf
|
|
from modules.services.serviceGeneration.renderers.rendererDocx import RendererDocx
|
|
from modules.services.serviceGeneration.renderers.rendererXlsx import RendererXlsx
|
|
from modules.services.serviceGeneration.renderers.rendererPptx import RendererPptx
|
|
|
|
mime_map = {
|
|
"application/pdf": (RendererPdf, "application/pdf"),
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": (RendererDocx, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"),
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": (RendererXlsx, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"),
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation": (RendererPptx, "application/vnd.openxmlformats-officedocument.presentationml.presentation"),
|
|
}
|
|
pair = mime_map.get(mimeType)
|
|
if not pair:
|
|
return None, None
|
|
cls, out_mime = pair
|
|
renderer = cls(self.services)
|
|
return renderer, out_mime
|
|
|
|
def _reloadNamesFromConfig(self) -> None:
|
|
"""Reload names from config and update processors"""
|
|
try:
|
|
config = self.getConfig()
|
|
if not config:
|
|
return
|
|
|
|
# Parse namesToParse string into list
|
|
names_list = []
|
|
if config.namesToParse:
|
|
names_list = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
|
|
|
|
# Update internal list
|
|
self.NamesToParse = names_list
|
|
|
|
# Recreate processors with updated names
|
|
self.textProcessor = TextProcessor(names_list)
|
|
self.listProcessor = ListProcessor(names_list)
|
|
|
|
logger.debug(f"Reloaded {len(names_list)} names from config")
|
|
except Exception as e:
|
|
logger.error(f"Error reloading names from config: {str(e)}")
|
|
# Continue with existing names if reload fails
|
|
|
|
# Helper functions
|
|
|
|
def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]:
|
|
"""Process text and return unified dict for API consumption."""
|
|
try:
|
|
# Reload names from config before processing to ensure we have the latest names
|
|
self._reloadNamesFromConfig()
|
|
|
|
# Auto-detect content type if not provided
|
|
if textType is None:
|
|
textType = self.commonUtils.detectContentType(text)
|
|
|
|
# Check if content is binary data
|
|
if self.binaryProcessor.isBinaryContent(text):
|
|
data, mapping, replaced_fields, processed_info = self.binaryProcessor.processBinaryContent(text)
|
|
neutralized_text = text if isinstance(data, str) else str(data)
|
|
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
|
|
return NeutralizationResult(
|
|
neutralized_text=neutralized_text,
|
|
mapping=mapping,
|
|
attributes=attributes,
|
|
processed_info=processed_info
|
|
).model_dump()
|
|
|
|
# Inline former _processData routing
|
|
if textType in ['csv', 'json', 'xml']:
|
|
if textType == 'csv':
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processCsvContent(text)
|
|
elif textType == 'json':
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processJsonContent(text)
|
|
else: # xml
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processXmlContent(text)
|
|
else:
|
|
data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text)
|
|
# Stringify data consistently
|
|
if textType == 'csv':
|
|
try:
|
|
neutralized_text = data.to_csv(index=False)
|
|
except Exception:
|
|
neutralized_text = str(data)
|
|
elif textType == 'json':
|
|
neutralized_text = json.dumps(data, ensure_ascii=False)
|
|
elif textType == 'xml':
|
|
neutralized_text = str(data)
|
|
else:
|
|
neutralized_text = str(data)
|
|
|
|
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
|
|
return NeutralizationResult(
|
|
neutralized_text=neutralized_text,
|
|
mapping=mapping,
|
|
attributes=attributes,
|
|
processed_info=processed_info
|
|
).model_dump()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing content: {str(e)}")
|
|
return NeutralizationResult(
|
|
neutralized_text='',
|
|
mapping={},
|
|
attributes=[],
|
|
processed_info={'type': 'error', 'error': str(e)}
|
|
).model_dump()
|
|
|
|
def _isBinaryMimeType(self, mime_type: str) -> bool:
|
|
"""Check if a MIME type represents binary content that cannot be neutralized as text"""
|
|
if not mime_type:
|
|
return False
|
|
|
|
mime_type_lower = mime_type.lower()
|
|
|
|
# Text-based MIME types that CAN be neutralized (explicit list)
|
|
text_mime_types = [
|
|
'text/plain', 'text/html', 'text/css', 'text/markdown', 'text/csv',
|
|
'text/javascript', 'text/xml', 'text/json',
|
|
'application/json', 'application/xml', 'application/javascript',
|
|
'application/csv'
|
|
]
|
|
|
|
# Check explicit text types first
|
|
if mime_type_lower in text_mime_types:
|
|
return False
|
|
|
|
# Text-based prefixes that can be neutralized
|
|
if mime_type_lower.startswith('text/'):
|
|
return False
|
|
|
|
# Binary MIME types that CANNOT be neutralized
|
|
binary_mime_prefixes = [
|
|
'image/', 'audio/', 'video/',
|
|
'application/pdf', 'application/zip',
|
|
'application/octet-stream', 'application/x-',
|
|
'application/vnd.', 'application/msword',
|
|
'application/vnd.ms-', 'application/vnd.openxmlformats-'
|
|
]
|
|
|
|
# Check if it's a binary type by prefix
|
|
if any(mime_type_lower.startswith(prefix) for prefix in binary_mime_prefixes):
|
|
return True
|
|
|
|
# Additional specific binary document types
|
|
binary_mime_types = [
|
|
'application/pdf', 'application/msword', 'application/vnd.ms-excel',
|
|
'application/vnd.ms-powerpoint',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
|
'application/zip', 'application/x-rar-compressed', 'application/x-7z-compressed',
|
|
'application/x-tar', 'application/gzip'
|
|
]
|
|
|
|
return mime_type_lower in binary_mime_types
|
|
|
|
def _getContentTypeFromMime(self, mime_type: str) -> str:
|
|
"""Determine content type from MIME type for neutralization processing"""
|
|
if mime_type.startswith('text/'):
|
|
return 'text'
|
|
elif mime_type in ['application/json', 'application/xml', 'text/xml']:
|
|
return 'json' if 'json' in mime_type else 'xml'
|
|
elif mime_type in ['text/csv', 'application/csv']:
|
|
return 'csv'
|
|
else:
|
|
return 'text' # Default to text processing
|
|
|