gateway/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py
2026-03-30 00:14:57 +02:00

913 lines
42 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Data Neutralization Service
Handles file processing for data neutralization including SharePoint integration
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Supports TXT, JSON, CSV, PDF, DOCX, XLSX, PPTX (extract -> neutralize -> generate)
Mehrsprachig: DE, EN, FR, IT
"""
import asyncio
import logging
import re
import json
from typing import Dict, List, Any, Optional
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
from modules.features.neutralization.interfaceFeatureNeutralizer import InterfaceFeatureNeutralizer, getInterface as getNeutralizerInterface
# Import all necessary classes and functions for neutralization
from .subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute
from .subProcessText import TextProcessor, PlainText
from .subProcessList import ListProcessor, TableData
from .subProcessBinary import BinaryProcessor
from .subProcessPdfInPlace import neutralize_pdf_in_place
from .subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns
from .subContentPartAdapter import content_parts_to_renderer_schema
logger = logging.getLogger(__name__)
# MIME types that can be processed via extract -> neutralize -> generate
EXTRACTABLE_BINARY_MIME_TYPES = frozenset({
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
})
class NeutralizationService:
"""Service for handling data neutralization operations"""
def __init__(self, serviceCenter=None, getServiceFn=None, NamesToParse: List[str] = None):
"""Initialize the service with user context and anonymization processors
Args:
serviceCenter: Service center context or legacy service center instance
getServiceFn: Service resolver function (injected by ServiceCenter resolver)
NamesToParse: List of names to parse and replace (case-insensitive)
"""
self.services = serviceCenter
self._getService = getServiceFn
self.interfaceDbComponent = getattr(serviceCenter, "interfaceDbComponent", None)
# Create feature-specific interface for neutralizer DB operations
self.interfaceNeutralizer: InterfaceFeatureNeutralizer = None
if serviceCenter and getattr(serviceCenter, "interfaceDbApp", None):
dbApp = serviceCenter.interfaceDbApp
self.interfaceNeutralizer = getNeutralizerInterface(
currentUser=dbApp.currentUser,
mandateId=serviceCenter.mandateId or dbApp.mandateId,
featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None) or getattr(dbApp, 'featureInstanceId', None)
)
elif serviceCenter and getattr(serviceCenter, "user", None):
self.interfaceNeutralizer = getNeutralizerInterface(
currentUser=serviceCenter.user,
mandateId=getattr(serviceCenter, 'mandateId', None) or getattr(serviceCenter, 'mandate_id', None),
featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None) or getattr(serviceCenter, 'feature_instance_id', None),
)
namesList = NamesToParse if isinstance(NamesToParse, list) else []
self.NamesToParse = namesList
self.textProcessor = TextProcessor(namesList)
self.listProcessor = ListProcessor(namesList)
self.binaryProcessor = BinaryProcessor()
self.commonUtils = CommonUtils()
def getConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the neutralization configuration for the current user's mandate"""
if not self.interfaceNeutralizer:
return None
return self.interfaceNeutralizer.getNeutralizationConfig()
def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig:
"""Save or update the neutralization configuration"""
if not self.interfaceNeutralizer:
raise ValueError("User context required for saving configuration")
return self.interfaceNeutralizer.createOrUpdateNeutralizationConfig(configData)
# Public API: process text or file
_NEUT_INSTRUCTION = (
"Analyze the following text and identify ALL sensitive content that must be neutralized:\n"
"1. Personal data (PII): names of persons, email addresses, phone numbers, "
"physical addresses, ID numbers, dates of birth, financial data (IBAN, account numbers), "
"social security numbers\n"
"2. Protected business logic: proprietary algorithms, trade secrets, confidential "
"processes, internal procedures, code snippets that reveal implementation details\n"
"3. Named entities: company names, product names, project names, brand names\n\n"
"Return ONLY a JSON array (no markdown, no explanation):\n"
'[{"text":"exact substring","type":"name|email|phone|address|id|financial|logic|company|product|location|other"}]\n\n'
"Rules:\n"
"- Every entry's 'text' must be an exact, verbatim substring of the input.\n"
"- Do NOT include generic words, common language constructs or non-sensitive terms.\n"
"- If nothing is sensitive, return [].\n\n"
)
_BYTES_PER_TOKEN = 3
_SELECTOR_MAX_RATIO = 0.8
_CHUNK_SAFETY_MARGIN = 0.9
def _resolveNeutModel(self):
"""Query the model registry for the best NEUTRALIZATION_TEXT model.
Returns the model object (with contextLength etc.) or None."""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector as _modSel
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
_models = modelRegistry.getAvailableModels()
_opts = AiCallOptions(operationType=OperationTypeEnum.NEUTRALIZATION_TEXT)
_failover = _modSel.getFailoverModelList("x", "", _opts, _models)
return _failover[0] if _failover else None
except Exception as _e:
logger.warning(f"_resolveNeutModel failed: {_e}")
return None
def _calcMaxChunkChars(self, model) -> int:
"""Derive the maximum text-chunk size (in characters) from the selected
model's contextLength, mirroring the rules in aicoreModelSelector:
promptTokens = promptBytes / 3 must be <= contextLength * 0.8
Subtract the instruction overhead and apply a safety margin."""
if not model or getattr(model, 'contextLength', 0) <= 0:
return 5000
_instructionBytes = len(self._NEUT_INSTRUCTION.encode('utf-8')) + 30
_maxPromptBytes = int(model.contextLength * self._SELECTOR_MAX_RATIO * self._BYTES_PER_TOKEN)
_maxChunkChars = int((_maxPromptBytes - _instructionBytes) * self._CHUNK_SAFETY_MARGIN)
return max(_maxChunkChars, 500)
@staticmethod
def _splitTextIntoChunks(text: str, maxChars: int) -> List[str]:
"""Split *text* into chunks of at most *maxChars*, preferring paragraph
then sentence boundaries so that the LLM sees coherent blocks."""
if len(text) <= maxChars:
return [text]
chunks: List[str] = []
remaining = text
while remaining:
if len(remaining) <= maxChars:
chunks.append(remaining)
break
_cut = maxChars
_para = remaining.rfind("\n\n", 0, _cut)
if _para > maxChars // 3:
_cut = _para + 2
else:
_nl = remaining.rfind("\n", 0, _cut)
if _nl > maxChars // 3:
_cut = _nl + 1
else:
_dot = remaining.rfind(". ", 0, _cut)
if _dot > maxChars // 3:
_cut = _dot + 2
else:
_sp = remaining.rfind(" ", 0, _cut)
if _sp > maxChars // 3:
_cut = _sp + 1
chunks.append(remaining[:_cut])
remaining = remaining[_cut:]
return chunks
async def _analyseChunk(self, aiService, chunkText: str) -> List[dict]:
"""Send one chunk to the NEUTRALIZATION_TEXT model, return raw findings list."""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
_prompt = self._NEUT_INSTRUCTION + "Text to analyze:\n---\n" + chunkText + "\n---"
_request = AiCallRequest(
prompt=_prompt,
options=AiCallOptions(operationType=OperationTypeEnum.NEUTRALIZATION_TEXT),
)
_response = await aiService.callAi(_request)
if not _response or not getattr(_response, 'content', None):
raise RuntimeError(
"Neutralization AI call returned no response "
"(no model available for NEUTRALIZATION_TEXT?)"
)
if getattr(_response, 'errorCount', 0) > 0 or getattr(_response, 'modelName', '') == 'error':
raise RuntimeError(
f"Neutralization AI call failed: {_response.content}"
)
_content = _response.content.strip()
if _content.startswith("```"):
_content = _content.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
try:
return json.loads(_content)
except json.JSONDecodeError:
_bracket = _content.find("[")
if _bracket >= 0:
try:
return json.loads(_content[_bracket:])
except json.JSONDecodeError:
pass
return []
async def processTextAsync(self, text: str, fileId: Optional[str] = None) -> Dict[str, Any]:
"""AI-powered text neutralization with automatic chunking.
If *text* exceeds the safe token budget for the neutralization model
it is split into smaller chunks, each analysed separately. Findings
are merged and de-duplicated before placeholder replacement.
Regex patterns run as a supplementary pass to catch anything the
model missed.
"""
import uuid as _uuid
aiService = None
if self._getService:
try:
aiService = self._getService("ai")
except Exception:
pass
aiMapping: Dict[str, str] = {}
if not aiService or not hasattr(aiService, 'callAi'):
raise RuntimeError("Neutralization requires an AI service but none is available")
if text.strip():
_neutModel = self._resolveNeutModel()
_maxChunkChars = self._calcMaxChunkChars(_neutModel)
logger.info(
f"processTextAsync: model={getattr(_neutModel, 'name', '?')}, "
f"contextLength={getattr(_neutModel, 'contextLength', '?')} tokens, "
f"maxChunkChars={_maxChunkChars}"
)
_chunks = self._splitTextIntoChunks(text, _maxChunkChars)
if len(_chunks) > 1:
logger.info(
f"processTextAsync: text ({len(text)} chars) "
f"split into {len(_chunks)} chunk(s) of max {_maxChunkChars} chars"
)
for _chunkIdx, _chunkText in enumerate(_chunks):
_findings = await self._analyseChunk(aiService, _chunkText)
if not isinstance(_findings, list):
continue
for _f in _findings:
if not isinstance(_f, dict):
continue
_origText = _f.get("text", "")
_patType = _f.get("type", "other").lower()
if not _origText or _origText not in text:
continue
if _origText in aiMapping:
continue
_uid = str(_uuid.uuid4())
_placeholder = f"[{_patType}.{_uid}]"
aiMapping[_origText] = _placeholder
logger.info(f"AI neutralization found {len(aiMapping)} item(s)"
+ (f" across {len(_chunks)} chunk(s)" if len(_chunks) > 1 else ""))
neutralizedText = text
for _orig, _ph in sorted(aiMapping.items(), key=lambda x: -len(x[0])):
neutralizedText = neutralizedText.replace(_orig, _ph)
regexMapping: Dict[str, str] = {}
finalText = neutralizedText
allMapping = {**aiMapping, **regexMapping}
if allMapping:
_loop = asyncio.get_event_loop()
await _loop.run_in_executor(
None, self._persistAttributes, allMapping, fileId
)
logger.debug(f"processTextAsync: {len(allMapping)} attribute(s) persisted")
return {
'neutralized_text': finalText,
'mapping': allMapping,
'attributes': [
NeutralizationAttribute(original=k, placeholder=v)
for k, v in allMapping.items()
],
'processed_info': {'type': 'text', 'ai_findings': len(aiMapping), 'regex_findings': len(regexMapping)},
}
def processText(self, text: str, fileId: Optional[str] = None) -> Dict[str, Any]:
"""Sync wrapper around processTextAsync. Propagates errors."""
try:
return asyncio.run(self.processTextAsync(text, fileId))
except RuntimeError as _re:
if "cannot be called from a running event loop" in str(_re):
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.processTextAsync(text, fileId))
raise
def processFile(self, fileId: str) -> Dict[str, Any]:
"""Neutralize a file referenced by its fileId using component interface.
Supports text files directly; PDF/DOCX/XLSX/PPTX via extract -> neutralize -> generate."""
if not self.interfaceDbComponent:
raise ValueError("Component interface is required to process a file by fileId")
fileInfo = None
try:
fileInfo = self.interfaceDbComponent.getFile(fileId)
except Exception:
fileInfo = None
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
fileData = self.interfaceDbComponent.getFileData(fileId)
if not fileData:
raise ValueError(f"No file data found for fileId: {fileId}")
mime_lower = (mimeType or '').lower()
# Binary but extractable: PDF, DOCX, XLSX, PPTX
if mime_lower in EXTRACTABLE_BINARY_MIME_TYPES:
try:
result = asyncio.run(self._processBinaryFile(fileData, fileName or "document", mime_lower, fileId))
if result:
result['file_id'] = fileId
result['neutralized_file_name'] = f"neutralized_{fileName}" if fileName else "neutralized_document"
return result
except Exception as e:
logger.error(f"Binary file neutralization failed: {str(e)}")
return {
'file_id': fileId,
'is_binary': True,
'mime_type': mimeType or 'unknown',
'file_name': fileName or 'unknown',
'neutralized_text': None,
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
}
# Binary but not extractable
if self._isBinaryMimeType(mimeType or ''):
return {
'file_id': fileId,
'is_binary': True,
'mime_type': mimeType or 'unknown',
'file_name': fileName or 'unknown',
'neutralized_text': None,
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported for neutralization'}
}
# Text-based file
textType = self._getContentTypeFromMime(mimeType or '')
try:
textContent = fileData.decode('utf-8')
except UnicodeDecodeError:
decoded = None
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
decoded = fileData.decode(enc)
break
except UnicodeDecodeError:
continue
if decoded is None:
raise ValueError("Unable to decode file content as text.")
textContent = decoded
result = self.processText(textContent, fileId)
if fileName:
result['neutralized_file_name'] = f"neutralized_{fileName}"
result['file_id'] = fileId
result['is_binary'] = False
return result
def processBinaryBytes(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]:
"""Neutralize binary file bytes (sync - use from sync callers). Uses asyncio.run when event loop not running."""
mime_lower = (mimeType or '').lower()
if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES:
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'}
}
try:
return asyncio.run(self._processBinaryFile(fileBytes, fileName, mime_lower, None))
except Exception as e:
logger.error(f"Binary neutralization failed: {str(e)}")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
}
async def processBinaryBytesAsync(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]:
"""Neutralize binary file bytes (async - use from async routes to avoid event loop conflict)."""
mime_lower = (mimeType or '').lower()
if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES:
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'}
}
try:
return await self._processBinaryFile(fileBytes, fileName, mime_lower, None)
except Exception as e:
logger.error(f"Binary neutralization failed: {str(e)}")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
}
async def processImageAsync(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]:
"""Analyze image via internal vision model to check for sensitive content.
Returns dict with:
- 'status': 'ok' | 'blocked' | 'error'
- 'hasSensitiveContent': bool
- 'analysis': str (model's analysis text, if available)
- 'processed_info': dict with details
Uses NEUTRALIZATION_IMAGE operation type → only internal Private-LLM models.
If no internal model available → returns 'blocked'.
"""
import base64
try:
aiService = None
if self._getService:
try:
aiService = self._getService("ai")
except Exception:
pass
if not aiService or not hasattr(aiService, 'callAi'):
logger.warning(f"processImage: AI service not available — blocking image '{fileName}'")
return {
'status': 'blocked',
'hasSensitiveContent': True,
'analysis': '',
'processed_info': {'type': 'image', 'status': 'blocked', 'reason': 'AI service unavailable'}
}
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
_b64Data = base64.b64encode(imageBytes).decode('utf-8')
_dataUrl = f"data:{mimeType};base64,{_b64Data}"
_prompt = (
"Analyze this image for personally identifiable information (PII). "
"Check for: names, addresses, phone numbers, email addresses, ID numbers, "
"faces, signatures, handwritten text, license plates, financial data. "
"Respond with JSON: {\"hasPII\": true/false, \"findings\": [\"...\"]}"
)
_request = AiCallRequest(
prompt=_prompt,
options=AiCallOptions(operationType=OperationTypeEnum.NEUTRALIZATION_IMAGE),
messages=[{"role": "user", "content": [
{"type": "text", "text": _prompt},
{"type": "image_url", "image_url": {"url": _dataUrl}},
]}],
)
_response = await aiService.callAi(_request)
_hasPII = False
_analysis = _response.content if _response and hasattr(_response, 'content') else ''
if _analysis:
_lowerAnalysis = _analysis.lower()
if '"haspii": true' in _lowerAnalysis or '"haspii":true' in _lowerAnalysis:
_hasPII = True
return {
'status': 'blocked' if _hasPII else 'ok',
'hasSensitiveContent': _hasPII,
'analysis': _analysis,
'processed_info': {'type': 'image', 'status': 'blocked' if _hasPII else 'ok', 'fileName': fileName}
}
except Exception as e:
logger.error(f"processImage failed for '{fileName}': {e}")
return {
'status': 'blocked',
'hasSensitiveContent': True,
'analysis': '',
'processed_info': {'type': 'image', 'status': 'error', 'error': str(e)}
}
def processImage(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]:
"""Sync wrapper for processImageAsync. Uses asyncio.run when no event loop is running."""
import asyncio
try:
return asyncio.run(self.processImageAsync(imageBytes, fileName, mimeType))
except RuntimeError:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.processImageAsync(imageBytes, fileName, mimeType))
def resolveText(self, text: str) -> str:
if not self.interfaceNeutralizer:
return text
try:
placeholderPattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
matches = re.findall(placeholderPattern, text)
resolvedText = text
for placeholderType, uid in matches:
attribute = self.interfaceNeutralizer.getAttributeById(uid)
if attribute:
placeholder = f"[{placeholderType}.{uid}]"
resolvedText = resolvedText.replace(placeholder, attribute["originalText"])
return resolvedText
except Exception:
return text
def getAttributes(self) -> List[DataNeutralizerAttributes]:
"""Get all neutralization attributes for the current user's mandate"""
if not self.interfaceNeutralizer:
return []
try:
# Use the interface method which properly converts dicts to objects
return self.interfaceNeutralizer.getNeutralizationAttributes()
except Exception as e:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
"""Delete neutralization attributes for a specific file"""
if not self.interfaceNeutralizer:
return False
return self.interfaceNeutralizer.deleteNeutralizationAttributes(fileId)
def getSnapshots(self):
if not self.interfaceNeutralizer:
return []
return self.interfaceNeutralizer.getSnapshots()
def clearSnapshots(self) -> int:
if not self.interfaceNeutralizer:
return 0
return self.interfaceNeutralizer.clearSnapshots()
def saveSnapshot(self, sourceLabel: str, neutralizedText: str, placeholderCount: int = 0):
if not self.interfaceNeutralizer:
logger.warning("saveSnapshot: interfaceNeutralizer is None — snapshot not stored")
return None
return self.interfaceNeutralizer.createSnapshot(sourceLabel, neutralizedText, placeholderCount)
def _persistAttributes(self, mapping: Dict[str, str], fileId: Optional[str]) -> None:
"""Persist mapping to DB for resolve to work. mapping: originalText -> placeholder e.g. '[email.uuid]'"""
if not self.interfaceNeutralizer or not mapping:
return
import re
placeholder_re = re.compile(r'^\[([a-z]+)\.([a-f0-9-]{36})\]$')
for original_text, placeholder in mapping.items():
m = placeholder_re.match(placeholder)
if m:
pattern_type, uid = m.group(1), m.group(2)
try:
self.interfaceNeutralizer.createAttribute(
attributeId=uid,
originalText=original_text,
patternType=pattern_type,
fileId=fileId
)
except Exception as e:
logger.debug(f"Could not persist attribute {uid}: {e}")
async def _processBinaryFile(
self,
fileBytes: bytes,
fileName: str,
mimeType: str,
fileId: Optional[str]
) -> Dict[str, Any]:
"""Extract -> neutralize -> adapt -> generate for PDF/DOCX/XLSX/PPTX."""
from modules.serviceCenter.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
# Ensure registries exist
if ExtractionService._sharedExtractorRegistry is None:
ExtractionService(self.services)
registry = ExtractionService._sharedExtractorRegistry
chunker = ExtractionService._sharedChunkerRegistry
opts = ExtractionOptions(prompt="neutralize", mergeStrategy=MergeStrategy(preserveChunks=True))
# 1. Extract
extracted = runExtraction(registry, chunker, fileBytes, fileName, mimeType, opts)
parts = extracted.parts if hasattr(extracted, 'parts') else []
if not parts:
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'No content extracted'}
}
# 2. Neutralize each text/table part
all_mapping: Dict[str, str] = {}
neutralized_parts: List[Any] = []
neutralization_error: Optional[str] = None
for part in parts:
p = part if isinstance(part, dict) else part.model_dump() if hasattr(part, 'model_dump') else part
type_group = p.get('typeGroup', '')
data = p.get('data', '')
if type_group == 'binary' or not (data and str(data).strip()):
neutralized_parts.append(part)
continue
if type_group == 'image':
import base64 as _b64img
try:
_imgBytes = _b64img.b64decode(str(data))
_imgResult = await self.processImageAsync(_imgBytes, fileName)
if _imgResult.get("status") == "ok":
neutralized_parts.append(part)
else:
logger.warning(f"Image part blocked in binary file '{fileName}' (PII detected), removing")
except Exception as _imgErr:
logger.warning(f"Image check failed in binary file '{fileName}': {_imgErr}, removing (fail-safe)")
continue
nr = await self.processTextAsync(str(data), fileId)
proc = nr.get('processed_info', {}) or {}
if isinstance(proc, dict) and proc.get('type') == 'error':
neutralization_error = proc.get('error', 'Neutralization failed')
neu_text = nr.get('neutralized_text', str(data))
mapping = nr.get('mapping', {})
all_mapping.update(mapping)
new_part = {**p, 'data': neu_text}
neutralized_parts.append(new_part)
# 3. PDF: Use in-place only; no fallback to render
if mimeType == "application/pdf":
if neutralization_error:
logger.error(f"PDF neutralization aborted: {neutralization_error}")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': neutralization_error}
}
in_place_bytes = neutralize_pdf_in_place(fileBytes, all_mapping)
if in_place_bytes is not None:
logger.info("PDF neutralization completed via in-place redaction (layout preserved)")
return {
'neutralized_text': None,
'neutralized_bytes': in_place_bytes,
'neutralized_file_name': f"neutralized_{fileName}",
'is_binary': True,
'mime_type': 'application/pdf',
'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()],
'processed_info': {'type': 'binary', 'status': 'success', 'format': 'pdf', 'method': 'in-place'}
}
logger.error("PDF in-place neutralization failed")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'PDF in-place neutralization failed'}
}
# 4. Adapter: ContentPart list -> renderer schema (non-PDF only)
schema = content_parts_to_renderer_schema(neutralized_parts, title=fileName or "Neutralized")
# 5. Render to format
renderer, output_mime = self._getRendererForMime(mimeType)
if not renderer:
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': f'No renderer for {mimeType}'}
}
try:
logger.info(f"Calling renderer.render for mime={mimeType}, renderer={type(renderer).__name__}")
rendered = await renderer.render(schema, fileName or "document", None, None)
logger.info(f"Renderer returned: type={type(rendered).__name__}, len={len(rendered) if rendered else 0}")
if not rendered or len(rendered) == 0:
logger.error("Renderer returned empty list")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'}
}
doc = rendered[0]
logger.info(f"First doc: type={type(doc).__name__}, isinstance(dict)={isinstance(doc, dict)}, has documentData attr={hasattr(doc, 'documentData')}")
# Extract documentData: Pydantic v2 models may need model_dump() for reliable access
if isinstance(doc, dict):
doc_data = doc.get('documentData')
elif hasattr(doc, 'model_dump'):
d = doc.model_dump(mode='python')
doc_data = d.get('documentData')
else:
doc_data = getattr(doc, 'documentData', None)
logger.info(f"doc_data: type={type(doc_data).__name__ if doc_data is not None else 'None'}, len={len(doc_data) if doc_data else 0}")
if doc_data is None:
logger.error("Renderer returned document with no documentData")
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Renderer returned no data'}
}
if isinstance(doc_data, str):
doc_data = doc_data.encode('utf-8')
return {
'neutralized_text': None,
'neutralized_bytes': doc_data,
'neutralized_file_name': f"neutralized_{fileName}",
'is_binary': True,
'mime_type': output_mime,
'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()],
'processed_info': {'type': 'binary', 'status': 'success', 'format': mimeType}
}
except Exception as e:
logger.error(f"Render failed for {mimeType}: {str(e)}", exc_info=True)
raise
return {
'neutralized_text': None,
'neutralized_bytes': None,
'is_binary': True,
'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'}
}
def _getRendererForMime(self, mimeType: str):
"""Get renderer instance and output mime for the given input MIME type."""
from modules.serviceCenter.services.serviceGeneration.renderers.rendererPdf import RendererPdf
from modules.serviceCenter.services.serviceGeneration.renderers.rendererDocx import RendererDocx
from modules.serviceCenter.services.serviceGeneration.renderers.rendererXlsx import RendererXlsx
from modules.serviceCenter.services.serviceGeneration.renderers.rendererPptx import RendererPptx
mime_map = {
"application/pdf": (RendererPdf, "application/pdf"),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": (RendererDocx, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"),
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": (RendererXlsx, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"),
"application/vnd.openxmlformats-officedocument.presentationml.presentation": (RendererPptx, "application/vnd.openxmlformats-officedocument.presentationml.presentation"),
}
pair = mime_map.get(mimeType)
if not pair:
return None, None
cls, out_mime = pair
renderer = cls(self.services)
return renderer, out_mime
def _reloadNamesFromConfig(self) -> None:
"""Reload names from config and update processors"""
try:
config = self.getConfig()
if not config:
return
# Parse namesToParse string into list
names_list = []
if config.namesToParse:
names_list = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
# Update internal list
self.NamesToParse = names_list
# Recreate processors with updated names
self.textProcessor = TextProcessor(names_list)
self.listProcessor = ListProcessor(names_list)
logger.debug(f"Reloaded {len(names_list)} names from config")
except Exception as e:
logger.error(f"Error reloading names from config: {str(e)}")
# Continue with existing names if reload fails
# Helper functions
def _neutralizeTextLight(self, text: str) -> Dict[str, Any]:
"""Regex-only supplementary pass using already-initialised processors.
Unlike ``_neutralizeText`` this does **no** DB I/O
(``_reloadNamesFromConfig`` is skipped) so it is safe to call from
an async context without blocking the event-loop or risking a
DB-connection-pool deadlock during parallel document processing.
"""
try:
data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text)
neutralized_text = str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info,
).model_dump()
except Exception as e:
logger.warning(f"_neutralizeTextLight error: {e}")
return {'neutralized_text': text, 'mapping': {}, 'attributes': [], 'processed_info': {'type': 'error', 'error': str(e)}}
def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]:
"""Process text and return unified dict for API consumption."""
try:
self._reloadNamesFromConfig()
# Auto-detect content type if not provided
if textType is None:
textType = self.commonUtils.detectContentType(text)
# Check if content is binary data
if self.binaryProcessor.isBinaryContent(text):
data, mapping, replaced_fields, processed_info = self.binaryProcessor.processBinaryContent(text)
neutralized_text = text if isinstance(data, str) else str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
# Inline former _processData routing
if textType in ['csv', 'json', 'xml']:
if textType == 'csv':
data, mapping, replaced_fields, processed_info = self.listProcessor.processCsvContent(text)
elif textType == 'json':
data, mapping, replaced_fields, processed_info = self.listProcessor.processJsonContent(text)
else: # xml
data, mapping, replaced_fields, processed_info = self.listProcessor.processXmlContent(text)
else:
data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text)
# Stringify data consistently
if textType == 'csv':
try:
neutralized_text = data.to_csv(index=False)
except Exception:
neutralized_text = str(data)
elif textType == 'json':
neutralized_text = json.dumps(data, ensure_ascii=False)
elif textType == 'xml':
neutralized_text = str(data)
else:
neutralized_text = str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
return NeutralizationResult(
neutralized_text='',
mapping={},
attributes=[],
processed_info={'type': 'error', 'error': str(e)}
).model_dump()
def _isBinaryMimeType(self, mime_type: str) -> bool:
"""Check if a MIME type represents binary content that cannot be neutralized as text"""
if not mime_type:
return False
mime_type_lower = mime_type.lower()
# Text-based MIME types that CAN be neutralized (explicit list)
text_mime_types = [
'text/plain', 'text/html', 'text/css', 'text/markdown', 'text/csv',
'text/javascript', 'text/xml', 'text/json',
'application/json', 'application/xml', 'application/javascript',
'application/csv'
]
# Check explicit text types first
if mime_type_lower in text_mime_types:
return False
# Text-based prefixes that can be neutralized
if mime_type_lower.startswith('text/'):
return False
# Binary MIME types that CANNOT be neutralized
binary_mime_prefixes = [
'image/', 'audio/', 'video/',
'application/pdf', 'application/zip',
'application/octet-stream', 'application/x-',
'application/vnd.', 'application/msword',
'application/vnd.ms-', 'application/vnd.openxmlformats-'
]
# Check if it's a binary type by prefix
if any(mime_type_lower.startswith(prefix) for prefix in binary_mime_prefixes):
return True
# Additional specific binary document types
binary_mime_types = [
'application/pdf', 'application/msword', 'application/vnd.ms-excel',
'application/vnd.ms-powerpoint',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'application/zip', 'application/x-rar-compressed', 'application/x-7z-compressed',
'application/x-tar', 'application/gzip'
]
return mime_type_lower in binary_mime_types
def _getContentTypeFromMime(self, mime_type: str) -> str:
"""Determine content type from MIME type for neutralization processing"""
if mime_type.startswith('text/'):
return 'text'
elif mime_type in ['application/json', 'application/xml', 'text/xml']:
return 'json' if 'json' in mime_type else 'xml'
elif mime_type in ['text/csv', 'application/csv']:
return 'csv'
else:
return 'text' # Default to text processing