diff --git a/modules/workflows/methods/methodContext/actions/__init__.py b/modules/workflows/methods/methodContext/actions/__init__.py index 9059d6bc..1750882e 100644 --- a/modules/workflows/methods/methodContext/actions/__init__.py +++ b/modules/workflows/methods/methodContext/actions/__init__.py @@ -6,11 +6,13 @@ # Export all actions from .getDocumentIndex import getDocumentIndex from .extractContent import extractContent +from .neutralizeData import neutralizeData from .triggerPreprocessingServer import triggerPreprocessingServer __all__ = [ 'getDocumentIndex', 'extractContent', + 'neutralizeData', 'triggerPreprocessingServer', ] diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index 799ce61d..8c5fd5fb 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -12,7 +12,7 @@ from typing import Dict, Any from modules.workflows.methods.methodBase import action from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelDocref import DocumentReferenceList -from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy +from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart logger = logging.getLogger(__name__) @@ -108,6 +108,74 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: # Pass operationId for hierarchical per-document progress logging extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) + # Check if neutralization is enabled and should be applied automatically + neutralizationEnabled = False + try: + config = self.services.neutralization.getConfig() + neutralizationEnabled = config and config.enabled + except Exception as e: + logger.debug(f"Could not check neutralization config: {str(e)}") + + # Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing) + if neutralizationEnabled: + self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data") + logger.info("Neutralization enabled - neutralizing extracted content data") + + # Neutralize each ContentExtracted result + for extracted in extractedResults: + if extracted.parts: + neutralizedParts = [] + for part in extracted.parts: + if not isinstance(part, ContentPart): + # Try to parse as ContentPart if it's a dict + if isinstance(part, dict): + try: + part = ContentPart(**part) + except Exception as e: + logger.warning(f"Could not parse ContentPart: {str(e)}") + neutralizedParts.append(part) + continue + else: + neutralizedParts.append(part) + continue + + # Neutralize the data field if it contains text + if part.data: + try: + # Call neutralization service + neutralizationResult = self.services.neutralization.processText(part.data) + + if neutralizationResult and 'neutralized_text' in neutralizationResult: + # Replace data with neutralized text + neutralizedData = neutralizationResult['neutralized_text'] + + # Create new ContentPart with neutralized data + neutralizedPart = ContentPart( + id=part.id, + parentId=part.parentId, + label=part.label, + typeGroup=part.typeGroup, + mimeType=part.mimeType, + data=neutralizedData, + metadata=part.metadata.copy() if part.metadata else {} + ) + neutralizedParts.append(neutralizedPart) + else: + # Neutralization failed, use original part + logger.warning(f"Neutralization did not return neutralized_text for part {part.id}") + neutralizedParts.append(part) + except Exception as e: + logger.error(f"Error neutralizing part {part.id}: {str(e)}") + # On error, use original part + neutralizedParts.append(part) + else: + # No data to neutralize, keep original part + neutralizedParts.append(part) + + # Update extracted result with neutralized parts + extracted.parts = neutralizedParts + logger.info(f"Neutralized {len(neutralizedParts)} content parts") + # Build ActionDocuments from ContentExtracted results self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents") actionDocuments = [] @@ -129,6 +197,7 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: "documentIndex": i, "extractedId": extracted.id, "partCount": len(extracted.parts) if extracted.parts else 0, + "neutralized": neutralizationEnabled, "originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None } actionDoc = ActionDocument( diff --git a/modules/workflows/methods/methodContext/actions/neutralizeData.py b/modules/workflows/methods/methodContext/actions/neutralizeData.py new file mode 100644 index 00000000..240fe6b1 --- /dev/null +++ b/modules/workflows/methods/methodContext/actions/neutralizeData.py @@ -0,0 +1,256 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. + +""" +Neutralize Data action for Context operations. +Neutralizes extracted content data from ContentExtracted documents. +""" + +import logging +import time +from typing import Dict, Any +from modules.workflows.methods.methodBase import action +from modules.datamodels.datamodelChat import ActionResult, ActionDocument +from modules.datamodels.datamodelDocref import DocumentReferenceList +from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart + +logger = logging.getLogger(__name__) + +@action +async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: + """ + Neutralize data from ContentExtracted documents. + + This action takes documents containing ContentExtracted objects (from extractContent) + and neutralizes the text data in ContentPart.data fields. + + Parameters: + - documentList (list, required): Document reference(s) containing ContentExtracted objects. + + Returns: + - ActionResult with ActionDocument containing neutralized ContentExtracted objects + """ + try: + # Init progress logger + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + operationId = f"context_neutralize_{workflowId}_{int(time.time())}" + + # Check if neutralization is enabled + neutralizationEnabled = False + try: + config = self.services.neutralization.getConfig() + neutralizationEnabled = config and config.enabled + except Exception as e: + logger.debug(f"Could not check neutralization config: {str(e)}") + + if not neutralizationEnabled: + logger.info("Neutralization is not enabled, returning documents unchanged") + # Return original documents if neutralization is disabled + # Get documents from documentList + documentListParam = parameters.get("documentList") + if not documentListParam: + return ActionResult.isFailure(error="documentList is required") + + # Convert to DocumentReferenceList if needed + if isinstance(documentListParam, DocumentReferenceList): + documentList = documentListParam + elif isinstance(documentListParam, str): + documentList = DocumentReferenceList.from_string_list([documentListParam]) + elif isinstance(documentListParam, list): + documentList = DocumentReferenceList.from_string_list(documentListParam) + else: + return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}") + + # Get ChatDocuments from documentList + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) + if not chatDocuments: + return ActionResult.isFailure(error="No documents found in documentList") + + # Return original documents as ActionDocuments + actionDocuments = [] + for chatDoc in chatDocuments: + # Extract ContentExtracted from documentData if available + if hasattr(chatDoc, 'documentData') and chatDoc.documentData: + actionDoc = ActionDocument( + documentName=getattr(chatDoc, 'fileName', 'unknown'), + documentData=chatDoc.documentData, + mimeType=getattr(chatDoc, 'mimeType', 'application/json'), + validationMetadata={ + "actionType": "context.neutralizeData", + "neutralized": False, + "reason": "Neutralization disabled" + } + ) + actionDocuments.append(actionDoc) + + return ActionResult.isSuccess(documents=actionDocuments) + + # Extract documentList from parameters dict + documentListParam = parameters.get("documentList") + if not documentListParam: + return ActionResult.isFailure(error="documentList is required") + + # Convert to DocumentReferenceList if needed + if isinstance(documentListParam, DocumentReferenceList): + documentList = documentListParam + elif isinstance(documentListParam, str): + documentList = DocumentReferenceList.from_string_list([documentListParam]) + elif isinstance(documentListParam, list): + documentList = DocumentReferenceList.from_string_list(documentListParam) + else: + return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}") + + # Start progress tracking + parentOperationId = parameters.get('parentOperationId') + self.services.chat.progressLogStart( + operationId, + "Neutralizing data from documents", + "Data Neutralization", + f"Documents: {len(documentList.references)}", + parentOperationId=parentOperationId + ) + + # Get ChatDocuments from documentList + self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents") + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) + + if not chatDocuments: + self.services.chat.progressLogFinish(operationId, False) + return ActionResult.isFailure(error="No documents found in documentList") + + logger.info(f"Neutralizing data from {len(chatDocuments)} documents") + + # Process each document + self.services.chat.progressLogUpdate(operationId, 0.3, "Processing documents") + actionDocuments = [] + + for i, chatDoc in enumerate(chatDocuments): + try: + # Extract ContentExtracted from documentData + if not hasattr(chatDoc, 'documentData') or not chatDoc.documentData: + logger.warning(f"Document {i+1} has no documentData, skipping") + continue + + documentData = chatDoc.documentData + + # Check if it's a ContentExtracted object + if isinstance(documentData, ContentExtracted): + contentExtracted = documentData + elif isinstance(documentData, dict): + # Try to parse as ContentExtracted + try: + contentExtracted = ContentExtracted(**documentData) + except Exception as e: + logger.warning(f"Document {i+1} documentData is not ContentExtracted: {str(e)}") + continue + else: + logger.warning(f"Document {i+1} documentData is not ContentExtracted or dict") + continue + + # Neutralize each ContentPart's data field + neutralizedParts = [] + for part in contentExtracted.parts: + if not isinstance(part, ContentPart): + # Try to parse as ContentPart + if isinstance(part, dict): + try: + part = ContentPart(**part) + except Exception as e: + logger.warning(f"Could not parse ContentPart: {str(e)}") + neutralizedParts.append(part) + continue + else: + neutralizedParts.append(part) + continue + + # Neutralize the data field if it contains text + if part.data: + try: + self.services.chat.progressLogUpdate( + operationId, + 0.3 + (i / len(chatDocuments)) * 0.6, + f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}" + ) + + # Call neutralization service + neutralizationResult = self.services.neutralization.processText(part.data) + + if neutralizationResult and 'neutralized_text' in neutralizationResult: + # Replace data with neutralized text + neutralizedData = neutralizationResult['neutralized_text'] + + # Create new ContentPart with neutralized data + neutralizedPart = ContentPart( + id=part.id, + parentId=part.parentId, + label=part.label, + typeGroup=part.typeGroup, + mimeType=part.mimeType, + data=neutralizedData, + metadata=part.metadata.copy() if part.metadata else {} + ) + neutralizedParts.append(neutralizedPart) + else: + # Neutralization failed, use original part + logger.warning(f"Neutralization did not return neutralized_text for part {part.id}") + neutralizedParts.append(part) + except Exception as e: + logger.error(f"Error neutralizing part {part.id}: {str(e)}") + # On error, use original part + neutralizedParts.append(part) + else: + # No data to neutralize, keep original part + neutralizedParts.append(part) + + # Create neutralized ContentExtracted object + neutralizedContentExtracted = ContentExtracted( + id=contentExtracted.id, + parts=neutralizedParts, + summary=contentExtracted.summary + ) + + # Create ActionDocument + originalFileName = getattr(chatDoc, 'fileName', f"document_{i+1}.json") + baseName = originalFileName.rsplit('.', 1)[0] if '.' in originalFileName else originalFileName + documentName = f"{baseName}_neutralized_{contentExtracted.id}.json" + + validationMetadata = { + "actionType": "context.neutralizeData", + "documentIndex": i, + "extractedId": contentExtracted.id, + "partCount": len(neutralizedParts), + "neutralized": True, + "originalFileName": originalFileName + } + + actionDoc = ActionDocument( + documentName=documentName, + documentData=neutralizedContentExtracted, + mimeType="application/json", + validationMetadata=validationMetadata + ) + actionDocuments.append(actionDoc) + + except Exception as e: + logger.error(f"Error processing document {i+1}: {str(e)}") + # Continue with other documents + continue + + if not actionDocuments: + self.services.chat.progressLogFinish(operationId, False) + return ActionResult.isFailure(error="No valid ContentExtracted documents found to neutralize") + + self.services.chat.progressLogFinish(operationId, True) + + return ActionResult.isSuccess(documents=actionDocuments) + + except Exception as e: + logger.error(f"Error in data neutralization: {str(e)}") + + # Complete progress tracking with failure + try: + self.services.chat.progressLogFinish(operationId, False) + except: + pass # Don't fail on progress logging errors + + return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py index 5481f70b..a635764f 100644 --- a/modules/workflows/methods/methodContext/methodContext.py +++ b/modules/workflows/methods/methodContext/methodContext.py @@ -13,6 +13,7 @@ from .helpers.formatting import FormattingHelper # Import actions from .actions.getDocumentIndex import getDocumentIndex from .actions.extractContent import extractContent +from .actions.neutralizeData import neutralizeData from .actions.triggerPreprocessingServer import triggerPreprocessingServer logger = logging.getLogger(__name__) @@ -68,6 +69,20 @@ class MethodContext(MethodBase): }, execute=extractContent.__get__(self, self.__class__) ), + "neutralizeData": WorkflowActionDefinition( + actionId="context.neutralizeData", + description="Neutralize extracted data from ContentExtracted documents (for use after extractContent)", + parameters={ + "documentList": WorkflowActionParameter( + name="documentList", + type="List[str]", + frontendType=FrontendType.DOCUMENT_REFERENCE, + required=True, + description="Document reference(s) containing ContentExtracted objects to neutralize" + ) + }, + execute=neutralizeData.__get__(self, self.__class__) + ), "triggerPreprocessingServer": WorkflowActionDefinition( actionId="context.triggerPreprocessingServer", description="Trigger preprocessing server at customer tenant to update database with configuration", @@ -104,5 +119,6 @@ class MethodContext(MethodBase): # Register actions as methods (optional, für direkten Zugriff) self.getDocumentIndex = getDocumentIndex.__get__(self, self.__class__) self.extractContent = extractContent.__get__(self, self.__class__) + self.neutralizeData = neutralizeData.__get__(self, self.__class__) self.triggerPreprocessingServer = triggerPreprocessingServer.__get__(self, self.__class__) diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 1906c0f6..987f46bf 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -960,17 +960,13 @@ class WorkflowManager: async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]: """Process file IDs from existing files and return ChatDocument objects. - If neutralization is enabled, files are neutralized and new files are created with neutralized content. - If neutralization fails, the document is not included and an error is logged to ChatLog.""" - documents = [] - # Check if neutralization is enabled - neutralizationEnabled = False - try: - config = self.services.neutralization.getConfig() - neutralizationEnabled = config and config.enabled - except Exception as e: - logger.debug(f"Could not check neutralization config: {str(e)}") + NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization + should happen AFTER content extraction (in extractContent action) to neutralize + extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization + happens after extraction but before AI processing. + """ + documents = [] workflow = self.services.workflow @@ -984,87 +980,23 @@ class WorkflowManager: originalFileName = fileInfo.get("fileName", "unknown") originalMimeType = fileInfo.get("mimeType", "application/octet-stream") - fileIdToUse = fileId - fileNameToUse = originalFileName fileSizeToUse = fileInfo.get("size", 0) - neutralizationFailed = False - # Neutralize file if enabled - if neutralizationEnabled: - try: - # Neutralize the file using the neutralization service - neutralizationResult = self.services.neutralization.processFile(fileId) - - # Check if file is binary (not neutralized) - if neutralizationResult.get('is_binary', False): - # Binary file - log INFO and use original file - infoMsg = f"File '{originalFileName}' (MIME type: {neutralizationResult.get('mime_type', 'unknown')}) is a binary file. Binary file neutralization will be implemented in the future. Using original file without neutralization." - logger.info(infoMsg) - self.services.chat.storeLog(workflow, { - "message": infoMsg, - "type": "info", - "status": "running", - "progress": 50 - }) - # Use original file (fileIdToUse already set to fileId) - elif neutralizationResult and 'neutralized_text' in neutralizationResult: - neutralizedText = neutralizationResult['neutralized_text'] - - # Create new file with neutralized content - neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}") - neutralizedContentBytes = neutralizedText.encode('utf-8') - - # Create file in component storage - neutralizedFileItem = self.services.interfaceDbComponent.createFile( - name=neutralizedFileName, - mimeType=originalMimeType, - content=neutralizedContentBytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes) - - # Use the neutralized file ID and actual size - fileIdToUse = neutralizedFileItem.id - fileNameToUse = neutralizedFileName - fileSizeToUse = len(neutralizedContentBytes) - - logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})") - else: - neutralizationFailed = True - errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})" - logger.warning(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) - except Exception as e: - neutralizationFailed = True - errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}" - logger.error(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) + # NOTE: Neutralization removed from here - it should happen in extractContent action + # after content extraction but before AI processing (for dynamic workflows) + # This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments - # Only skip document if neutralization failed (not for binary files) - if not neutralizationFailed: - # Create document with file ID (neutralized or original) - document = ChatDocument( - id=str(uuid.uuid4()), - messageId=messageId or "", - fileId=fileIdToUse, - fileName=fileNameToUse, - fileSize=fileSizeToUse, - mimeType=originalMimeType - ) - documents.append(document) - logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})") - else: - logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure") + # Create document with original file ID (no neutralization) + document = ChatDocument( + id=str(uuid.uuid4()), + messageId=messageId or "", + fileId=fileId, + fileName=originalFileName, + fileSize=fileSizeToUse, + mimeType=originalMimeType + ) + documents.append(document) + logger.info(f"Processed file ID {fileId} -> {document.fileName}") except Exception as e: errorMsg = f"Error processing file ID {fileId}: {str(e)}" logger.error(errorMsg)