From b2196bc6a357a09be02e1c879666745613a00c10 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 17 Dec 2025 22:03:58 +0100
Subject: [PATCH] neutralization integrated into extractor
---
.../methods/methodContext/actions/__init__.py | 2 +
.../methodContext/actions/extractContent.py | 71 ++++-
.../methodContext/actions/neutralizeData.py | 256 ++++++++++++++++++
.../methods/methodContext/methodContext.py | 16 ++
modules/workflows/workflowManager.py | 108 ++------
5 files changed, 364 insertions(+), 89 deletions(-)
create mode 100644 modules/workflows/methods/methodContext/actions/neutralizeData.py
diff --git a/modules/workflows/methods/methodContext/actions/__init__.py b/modules/workflows/methods/methodContext/actions/__init__.py
index 9059d6bc..1750882e 100644
--- a/modules/workflows/methods/methodContext/actions/__init__.py
+++ b/modules/workflows/methods/methodContext/actions/__init__.py
@@ -6,11 +6,13 @@
# Export all actions
from .getDocumentIndex import getDocumentIndex
from .extractContent import extractContent
+from .neutralizeData import neutralizeData
from .triggerPreprocessingServer import triggerPreprocessingServer
__all__ = [
'getDocumentIndex',
'extractContent',
+ 'neutralizeData',
'triggerPreprocessingServer',
]
diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py
index 799ce61d..8c5fd5fb 100644
--- a/modules/workflows/methods/methodContext/actions/extractContent.py
+++ b/modules/workflows/methods/methodContext/actions/extractContent.py
@@ -12,7 +12,7 @@ from typing import Dict, Any
from modules.workflows.methods.methodBase import action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
-from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
+from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart
logger = logging.getLogger(__name__)
@@ -108,6 +108,74 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
# Pass operationId for hierarchical per-document progress logging
extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId)
+ # Check if neutralization is enabled and should be applied automatically
+ neutralizationEnabled = False
+ try:
+ config = self.services.neutralization.getConfig()
+ neutralizationEnabled = config and config.enabled
+ except Exception as e:
+ logger.debug(f"Could not check neutralization config: {str(e)}")
+
+ # Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing)
+ if neutralizationEnabled:
+ self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data")
+ logger.info("Neutralization enabled - neutralizing extracted content data")
+
+ # Neutralize each ContentExtracted result
+ for extracted in extractedResults:
+ if extracted.parts:
+ neutralizedParts = []
+ for part in extracted.parts:
+ if not isinstance(part, ContentPart):
+ # Try to parse as ContentPart if it's a dict
+ if isinstance(part, dict):
+ try:
+ part = ContentPart(**part)
+ except Exception as e:
+ logger.warning(f"Could not parse ContentPart: {str(e)}")
+ neutralizedParts.append(part)
+ continue
+ else:
+ neutralizedParts.append(part)
+ continue
+
+ # Neutralize the data field if it contains text
+ if part.data:
+ try:
+ # Call neutralization service
+ neutralizationResult = self.services.neutralization.processText(part.data)
+
+ if neutralizationResult and 'neutralized_text' in neutralizationResult:
+ # Replace data with neutralized text
+ neutralizedData = neutralizationResult['neutralized_text']
+
+ # Create new ContentPart with neutralized data
+ neutralizedPart = ContentPart(
+ id=part.id,
+ parentId=part.parentId,
+ label=part.label,
+ typeGroup=part.typeGroup,
+ mimeType=part.mimeType,
+ data=neutralizedData,
+ metadata=part.metadata.copy() if part.metadata else {}
+ )
+ neutralizedParts.append(neutralizedPart)
+ else:
+ # Neutralization failed, use original part
+ logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
+ neutralizedParts.append(part)
+ except Exception as e:
+ logger.error(f"Error neutralizing part {part.id}: {str(e)}")
+ # On error, use original part
+ neutralizedParts.append(part)
+ else:
+ # No data to neutralize, keep original part
+ neutralizedParts.append(part)
+
+ # Update extracted result with neutralized parts
+ extracted.parts = neutralizedParts
+ logger.info(f"Neutralized {len(neutralizedParts)} content parts")
+
# Build ActionDocuments from ContentExtracted results
self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents")
actionDocuments = []
@@ -129,6 +197,7 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
"documentIndex": i,
"extractedId": extracted.id,
"partCount": len(extracted.parts) if extracted.parts else 0,
+ "neutralized": neutralizationEnabled,
"originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None
}
actionDoc = ActionDocument(
diff --git a/modules/workflows/methods/methodContext/actions/neutralizeData.py b/modules/workflows/methods/methodContext/actions/neutralizeData.py
new file mode 100644
index 00000000..240fe6b1
--- /dev/null
+++ b/modules/workflows/methods/methodContext/actions/neutralizeData.py
@@ -0,0 +1,256 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+
+"""
+Neutralize Data action for Context operations.
+Neutralizes extracted content data from ContentExtracted documents.
+"""
+
+import logging
+import time
+from typing import Dict, Any
+from modules.workflows.methods.methodBase import action
+from modules.datamodels.datamodelChat import ActionResult, ActionDocument
+from modules.datamodels.datamodelDocref import DocumentReferenceList
+from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
+
+logger = logging.getLogger(__name__)
+
+@action
+async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
+ """
+ Neutralize data from ContentExtracted documents.
+
+ This action takes documents containing ContentExtracted objects (from extractContent)
+ and neutralizes the text data in ContentPart.data fields.
+
+ Parameters:
+ - documentList (list, required): Document reference(s) containing ContentExtracted objects.
+
+ Returns:
+ - ActionResult with ActionDocument containing neutralized ContentExtracted objects
+ """
+ try:
+ # Init progress logger
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
+ operationId = f"context_neutralize_{workflowId}_{int(time.time())}"
+
+ # Check if neutralization is enabled
+ neutralizationEnabled = False
+ try:
+ config = self.services.neutralization.getConfig()
+ neutralizationEnabled = config and config.enabled
+ except Exception as e:
+ logger.debug(f"Could not check neutralization config: {str(e)}")
+
+ if not neutralizationEnabled:
+ logger.info("Neutralization is not enabled, returning documents unchanged")
+ # Return original documents if neutralization is disabled
+ # Get documents from documentList
+ documentListParam = parameters.get("documentList")
+ if not documentListParam:
+ return ActionResult.isFailure(error="documentList is required")
+
+ # Convert to DocumentReferenceList if needed
+ if isinstance(documentListParam, DocumentReferenceList):
+ documentList = documentListParam
+ elif isinstance(documentListParam, str):
+ documentList = DocumentReferenceList.from_string_list([documentListParam])
+ elif isinstance(documentListParam, list):
+ documentList = DocumentReferenceList.from_string_list(documentListParam)
+ else:
+ return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
+
+ # Get ChatDocuments from documentList
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
+ if not chatDocuments:
+ return ActionResult.isFailure(error="No documents found in documentList")
+
+ # Return original documents as ActionDocuments
+ actionDocuments = []
+ for chatDoc in chatDocuments:
+ # Extract ContentExtracted from documentData if available
+ if hasattr(chatDoc, 'documentData') and chatDoc.documentData:
+ actionDoc = ActionDocument(
+ documentName=getattr(chatDoc, 'fileName', 'unknown'),
+ documentData=chatDoc.documentData,
+ mimeType=getattr(chatDoc, 'mimeType', 'application/json'),
+ validationMetadata={
+ "actionType": "context.neutralizeData",
+ "neutralized": False,
+ "reason": "Neutralization disabled"
+ }
+ )
+ actionDocuments.append(actionDoc)
+
+ return ActionResult.isSuccess(documents=actionDocuments)
+
+ # Extract documentList from parameters dict
+ documentListParam = parameters.get("documentList")
+ if not documentListParam:
+ return ActionResult.isFailure(error="documentList is required")
+
+ # Convert to DocumentReferenceList if needed
+ if isinstance(documentListParam, DocumentReferenceList):
+ documentList = documentListParam
+ elif isinstance(documentListParam, str):
+ documentList = DocumentReferenceList.from_string_list([documentListParam])
+ elif isinstance(documentListParam, list):
+ documentList = DocumentReferenceList.from_string_list(documentListParam)
+ else:
+ return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
+
+ # Start progress tracking
+ parentOperationId = parameters.get('parentOperationId')
+ self.services.chat.progressLogStart(
+ operationId,
+ "Neutralizing data from documents",
+ "Data Neutralization",
+ f"Documents: {len(documentList.references)}",
+ parentOperationId=parentOperationId
+ )
+
+ # Get ChatDocuments from documentList
+ self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents")
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
+
+ if not chatDocuments:
+ self.services.chat.progressLogFinish(operationId, False)
+ return ActionResult.isFailure(error="No documents found in documentList")
+
+ logger.info(f"Neutralizing data from {len(chatDocuments)} documents")
+
+ # Process each document
+ self.services.chat.progressLogUpdate(operationId, 0.3, "Processing documents")
+ actionDocuments = []
+
+ for i, chatDoc in enumerate(chatDocuments):
+ try:
+ # Extract ContentExtracted from documentData
+ if not hasattr(chatDoc, 'documentData') or not chatDoc.documentData:
+ logger.warning(f"Document {i+1} has no documentData, skipping")
+ continue
+
+ documentData = chatDoc.documentData
+
+ # Check if it's a ContentExtracted object
+ if isinstance(documentData, ContentExtracted):
+ contentExtracted = documentData
+ elif isinstance(documentData, dict):
+ # Try to parse as ContentExtracted
+ try:
+ contentExtracted = ContentExtracted(**documentData)
+ except Exception as e:
+ logger.warning(f"Document {i+1} documentData is not ContentExtracted: {str(e)}")
+ continue
+ else:
+ logger.warning(f"Document {i+1} documentData is not ContentExtracted or dict")
+ continue
+
+ # Neutralize each ContentPart's data field
+ neutralizedParts = []
+ for part in contentExtracted.parts:
+ if not isinstance(part, ContentPart):
+ # Try to parse as ContentPart
+ if isinstance(part, dict):
+ try:
+ part = ContentPart(**part)
+ except Exception as e:
+ logger.warning(f"Could not parse ContentPart: {str(e)}")
+ neutralizedParts.append(part)
+ continue
+ else:
+ neutralizedParts.append(part)
+ continue
+
+ # Neutralize the data field if it contains text
+ if part.data:
+ try:
+ self.services.chat.progressLogUpdate(
+ operationId,
+ 0.3 + (i / len(chatDocuments)) * 0.6,
+ f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}"
+ )
+
+ # Call neutralization service
+ neutralizationResult = self.services.neutralization.processText(part.data)
+
+ if neutralizationResult and 'neutralized_text' in neutralizationResult:
+ # Replace data with neutralized text
+ neutralizedData = neutralizationResult['neutralized_text']
+
+ # Create new ContentPart with neutralized data
+ neutralizedPart = ContentPart(
+ id=part.id,
+ parentId=part.parentId,
+ label=part.label,
+ typeGroup=part.typeGroup,
+ mimeType=part.mimeType,
+ data=neutralizedData,
+ metadata=part.metadata.copy() if part.metadata else {}
+ )
+ neutralizedParts.append(neutralizedPart)
+ else:
+ # Neutralization failed, use original part
+ logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
+ neutralizedParts.append(part)
+ except Exception as e:
+ logger.error(f"Error neutralizing part {part.id}: {str(e)}")
+ # On error, use original part
+ neutralizedParts.append(part)
+ else:
+ # No data to neutralize, keep original part
+ neutralizedParts.append(part)
+
+ # Create neutralized ContentExtracted object
+ neutralizedContentExtracted = ContentExtracted(
+ id=contentExtracted.id,
+ parts=neutralizedParts,
+ summary=contentExtracted.summary
+ )
+
+ # Create ActionDocument
+ originalFileName = getattr(chatDoc, 'fileName', f"document_{i+1}.json")
+ baseName = originalFileName.rsplit('.', 1)[0] if '.' in originalFileName else originalFileName
+ documentName = f"{baseName}_neutralized_{contentExtracted.id}.json"
+
+ validationMetadata = {
+ "actionType": "context.neutralizeData",
+ "documentIndex": i,
+ "extractedId": contentExtracted.id,
+ "partCount": len(neutralizedParts),
+ "neutralized": True,
+ "originalFileName": originalFileName
+ }
+
+ actionDoc = ActionDocument(
+ documentName=documentName,
+ documentData=neutralizedContentExtracted,
+ mimeType="application/json",
+ validationMetadata=validationMetadata
+ )
+ actionDocuments.append(actionDoc)
+
+ except Exception as e:
+ logger.error(f"Error processing document {i+1}: {str(e)}")
+ # Continue with other documents
+ continue
+
+ if not actionDocuments:
+ self.services.chat.progressLogFinish(operationId, False)
+ return ActionResult.isFailure(error="No valid ContentExtracted documents found to neutralize")
+
+ self.services.chat.progressLogFinish(operationId, True)
+
+ return ActionResult.isSuccess(documents=actionDocuments)
+
+ except Exception as e:
+ logger.error(f"Error in data neutralization: {str(e)}")
+
+ # Complete progress tracking with failure
+ try:
+ self.services.chat.progressLogFinish(operationId, False)
+ except:
+ pass # Don't fail on progress logging errors
+
+ return ActionResult.isFailure(error=str(e))
diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py
index 5481f70b..a635764f 100644
--- a/modules/workflows/methods/methodContext/methodContext.py
+++ b/modules/workflows/methods/methodContext/methodContext.py
@@ -13,6 +13,7 @@ from .helpers.formatting import FormattingHelper
# Import actions
from .actions.getDocumentIndex import getDocumentIndex
from .actions.extractContent import extractContent
+from .actions.neutralizeData import neutralizeData
from .actions.triggerPreprocessingServer import triggerPreprocessingServer
logger = logging.getLogger(__name__)
@@ -68,6 +69,20 @@ class MethodContext(MethodBase):
},
execute=extractContent.__get__(self, self.__class__)
),
+ "neutralizeData": WorkflowActionDefinition(
+ actionId="context.neutralizeData",
+ description="Neutralize extracted data from ContentExtracted documents (for use after extractContent)",
+ parameters={
+ "documentList": WorkflowActionParameter(
+ name="documentList",
+ type="List[str]",
+ frontendType=FrontendType.DOCUMENT_REFERENCE,
+ required=True,
+ description="Document reference(s) containing ContentExtracted objects to neutralize"
+ )
+ },
+ execute=neutralizeData.__get__(self, self.__class__)
+ ),
"triggerPreprocessingServer": WorkflowActionDefinition(
actionId="context.triggerPreprocessingServer",
description="Trigger preprocessing server at customer tenant to update database with configuration",
@@ -104,5 +119,6 @@ class MethodContext(MethodBase):
# Register actions as methods (optional, für direkten Zugriff)
self.getDocumentIndex = getDocumentIndex.__get__(self, self.__class__)
self.extractContent = extractContent.__get__(self, self.__class__)
+ self.neutralizeData = neutralizeData.__get__(self, self.__class__)
self.triggerPreprocessingServer = triggerPreprocessingServer.__get__(self, self.__class__)
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 1906c0f6..987f46bf 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -960,17 +960,13 @@ class WorkflowManager:
async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]:
"""Process file IDs from existing files and return ChatDocument objects.
- If neutralization is enabled, files are neutralized and new files are created with neutralized content.
- If neutralization fails, the document is not included and an error is logged to ChatLog."""
- documents = []
- # Check if neutralization is enabled
- neutralizationEnabled = False
- try:
- config = self.services.neutralization.getConfig()
- neutralizationEnabled = config and config.enabled
- except Exception as e:
- logger.debug(f"Could not check neutralization config: {str(e)}")
+ NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization
+ should happen AFTER content extraction (in extractContent action) to neutralize
+ extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization
+ happens after extraction but before AI processing.
+ """
+ documents = []
workflow = self.services.workflow
@@ -984,87 +980,23 @@ class WorkflowManager:
originalFileName = fileInfo.get("fileName", "unknown")
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
- fileIdToUse = fileId
- fileNameToUse = originalFileName
fileSizeToUse = fileInfo.get("size", 0)
- neutralizationFailed = False
- # Neutralize file if enabled
- if neutralizationEnabled:
- try:
- # Neutralize the file using the neutralization service
- neutralizationResult = self.services.neutralization.processFile(fileId)
-
- # Check if file is binary (not neutralized)
- if neutralizationResult.get('is_binary', False):
- # Binary file - log INFO and use original file
- infoMsg = f"File '{originalFileName}' (MIME type: {neutralizationResult.get('mime_type', 'unknown')}) is a binary file. Binary file neutralization will be implemented in the future. Using original file without neutralization."
- logger.info(infoMsg)
- self.services.chat.storeLog(workflow, {
- "message": infoMsg,
- "type": "info",
- "status": "running",
- "progress": 50
- })
- # Use original file (fileIdToUse already set to fileId)
- elif neutralizationResult and 'neutralized_text' in neutralizationResult:
- neutralizedText = neutralizationResult['neutralized_text']
-
- # Create new file with neutralized content
- neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}")
- neutralizedContentBytes = neutralizedText.encode('utf-8')
-
- # Create file in component storage
- neutralizedFileItem = self.services.interfaceDbComponent.createFile(
- name=neutralizedFileName,
- mimeType=originalMimeType,
- content=neutralizedContentBytes
- )
- # Persist file data
- self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes)
-
- # Use the neutralized file ID and actual size
- fileIdToUse = neutralizedFileItem.id
- fileNameToUse = neutralizedFileName
- fileSizeToUse = len(neutralizedContentBytes)
-
- logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})")
- else:
- neutralizationFailed = True
- errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})"
- logger.warning(errorMsg)
- self.services.chat.storeLog(workflow, {
- "message": errorMsg,
- "type": "error",
- "status": "error",
- "progress": -1
- })
- except Exception as e:
- neutralizationFailed = True
- errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}"
- logger.error(errorMsg)
- self.services.chat.storeLog(workflow, {
- "message": errorMsg,
- "type": "error",
- "status": "error",
- "progress": -1
- })
+ # NOTE: Neutralization removed from here - it should happen in extractContent action
+ # after content extraction but before AI processing (for dynamic workflows)
+ # This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments
- # Only skip document if neutralization failed (not for binary files)
- if not neutralizationFailed:
- # Create document with file ID (neutralized or original)
- document = ChatDocument(
- id=str(uuid.uuid4()),
- messageId=messageId or "",
- fileId=fileIdToUse,
- fileName=fileNameToUse,
- fileSize=fileSizeToUse,
- mimeType=originalMimeType
- )
- documents.append(document)
- logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})")
- else:
- logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure")
+ # Create document with original file ID (no neutralization)
+ document = ChatDocument(
+ id=str(uuid.uuid4()),
+ messageId=messageId or "",
+ fileId=fileId,
+ fileName=originalFileName,
+ fileSize=fileSizeToUse,
+ mimeType=originalMimeType
+ )
+ documents.append(document)
+ logger.info(f"Processed file ID {fileId} -> {document.fileName}")
except Exception as e:
errorMsg = f"Error processing file ID {fileId}: {str(e)}"
logger.error(errorMsg)