neutralization integrated into extractor

This commit is contained in:
ValueOn AG 2025-12-17 22:03:58 +01:00
parent 56d6ecf978
commit b2196bc6a3
5 changed files with 364 additions and 89 deletions

View file

@ -6,11 +6,13 @@
# Export all actions
from .getDocumentIndex import getDocumentIndex
from .extractContent import extractContent
from .neutralizeData import neutralizeData
from .triggerPreprocessingServer import triggerPreprocessingServer
__all__ = [
'getDocumentIndex',
'extractContent',
'neutralizeData',
'triggerPreprocessingServer',
]

View file

@ -12,7 +12,7 @@ from typing import Dict, Any
from modules.workflows.methods.methodBase import action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart
logger = logging.getLogger(__name__)
@ -108,6 +108,74 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
# Pass operationId for hierarchical per-document progress logging
extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId)
# Check if neutralization is enabled and should be applied automatically
neutralizationEnabled = False
try:
config = self.services.neutralization.getConfig()
neutralizationEnabled = config and config.enabled
except Exception as e:
logger.debug(f"Could not check neutralization config: {str(e)}")
# Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing)
if neutralizationEnabled:
self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data")
logger.info("Neutralization enabled - neutralizing extracted content data")
# Neutralize each ContentExtracted result
for extracted in extractedResults:
if extracted.parts:
neutralizedParts = []
for part in extracted.parts:
if not isinstance(part, ContentPart):
# Try to parse as ContentPart if it's a dict
if isinstance(part, dict):
try:
part = ContentPart(**part)
except Exception as e:
logger.warning(f"Could not parse ContentPart: {str(e)}")
neutralizedParts.append(part)
continue
else:
neutralizedParts.append(part)
continue
# Neutralize the data field if it contains text
if part.data:
try:
# Call neutralization service
neutralizationResult = self.services.neutralization.processText(part.data)
if neutralizationResult and 'neutralized_text' in neutralizationResult:
# Replace data with neutralized text
neutralizedData = neutralizationResult['neutralized_text']
# Create new ContentPart with neutralized data
neutralizedPart = ContentPart(
id=part.id,
parentId=part.parentId,
label=part.label,
typeGroup=part.typeGroup,
mimeType=part.mimeType,
data=neutralizedData,
metadata=part.metadata.copy() if part.metadata else {}
)
neutralizedParts.append(neutralizedPart)
else:
# Neutralization failed, use original part
logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
neutralizedParts.append(part)
except Exception as e:
logger.error(f"Error neutralizing part {part.id}: {str(e)}")
# On error, use original part
neutralizedParts.append(part)
else:
# No data to neutralize, keep original part
neutralizedParts.append(part)
# Update extracted result with neutralized parts
extracted.parts = neutralizedParts
logger.info(f"Neutralized {len(neutralizedParts)} content parts")
# Build ActionDocuments from ContentExtracted results
self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents")
actionDocuments = []
@ -129,6 +197,7 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
"documentIndex": i,
"extractedId": extracted.id,
"partCount": len(extracted.parts) if extracted.parts else 0,
"neutralized": neutralizationEnabled,
"originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None
}
actionDoc = ActionDocument(

View file

@ -0,0 +1,256 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Neutralize Data action for Context operations.
Neutralizes extracted content data from ContentExtracted documents.
"""
import logging
import time
from typing import Dict, Any
from modules.workflows.methods.methodBase import action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
logger = logging.getLogger(__name__)
@action
async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Neutralize data from ContentExtracted documents.
This action takes documents containing ContentExtracted objects (from extractContent)
and neutralizes the text data in ContentPart.data fields.
Parameters:
- documentList (list, required): Document reference(s) containing ContentExtracted objects.
Returns:
- ActionResult with ActionDocument containing neutralized ContentExtracted objects
"""
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"context_neutralize_{workflowId}_{int(time.time())}"
# Check if neutralization is enabled
neutralizationEnabled = False
try:
config = self.services.neutralization.getConfig()
neutralizationEnabled = config and config.enabled
except Exception as e:
logger.debug(f"Could not check neutralization config: {str(e)}")
if not neutralizationEnabled:
logger.info("Neutralization is not enabled, returning documents unchanged")
# Return original documents if neutralization is disabled
# Get documents from documentList
documentListParam = parameters.get("documentList")
if not documentListParam:
return ActionResult.isFailure(error="documentList is required")
# Convert to DocumentReferenceList if needed
if isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
# Get ChatDocuments from documentList
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found in documentList")
# Return original documents as ActionDocuments
actionDocuments = []
for chatDoc in chatDocuments:
# Extract ContentExtracted from documentData if available
if hasattr(chatDoc, 'documentData') and chatDoc.documentData:
actionDoc = ActionDocument(
documentName=getattr(chatDoc, 'fileName', 'unknown'),
documentData=chatDoc.documentData,
mimeType=getattr(chatDoc, 'mimeType', 'application/json'),
validationMetadata={
"actionType": "context.neutralizeData",
"neutralized": False,
"reason": "Neutralization disabled"
}
)
actionDocuments.append(actionDoc)
return ActionResult.isSuccess(documents=actionDocuments)
# Extract documentList from parameters dict
documentListParam = parameters.get("documentList")
if not documentListParam:
return ActionResult.isFailure(error="documentList is required")
# Convert to DocumentReferenceList if needed
if isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Neutralizing data from documents",
"Data Neutralization",
f"Documents: {len(documentList.references)}",
parentOperationId=parentOperationId
)
# Get ChatDocuments from documentList
self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents")
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No documents found in documentList")
logger.info(f"Neutralizing data from {len(chatDocuments)} documents")
# Process each document
self.services.chat.progressLogUpdate(operationId, 0.3, "Processing documents")
actionDocuments = []
for i, chatDoc in enumerate(chatDocuments):
try:
# Extract ContentExtracted from documentData
if not hasattr(chatDoc, 'documentData') or not chatDoc.documentData:
logger.warning(f"Document {i+1} has no documentData, skipping")
continue
documentData = chatDoc.documentData
# Check if it's a ContentExtracted object
if isinstance(documentData, ContentExtracted):
contentExtracted = documentData
elif isinstance(documentData, dict):
# Try to parse as ContentExtracted
try:
contentExtracted = ContentExtracted(**documentData)
except Exception as e:
logger.warning(f"Document {i+1} documentData is not ContentExtracted: {str(e)}")
continue
else:
logger.warning(f"Document {i+1} documentData is not ContentExtracted or dict")
continue
# Neutralize each ContentPart's data field
neutralizedParts = []
for part in contentExtracted.parts:
if not isinstance(part, ContentPart):
# Try to parse as ContentPart
if isinstance(part, dict):
try:
part = ContentPart(**part)
except Exception as e:
logger.warning(f"Could not parse ContentPart: {str(e)}")
neutralizedParts.append(part)
continue
else:
neutralizedParts.append(part)
continue
# Neutralize the data field if it contains text
if part.data:
try:
self.services.chat.progressLogUpdate(
operationId,
0.3 + (i / len(chatDocuments)) * 0.6,
f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}"
)
# Call neutralization service
neutralizationResult = self.services.neutralization.processText(part.data)
if neutralizationResult and 'neutralized_text' in neutralizationResult:
# Replace data with neutralized text
neutralizedData = neutralizationResult['neutralized_text']
# Create new ContentPart with neutralized data
neutralizedPart = ContentPart(
id=part.id,
parentId=part.parentId,
label=part.label,
typeGroup=part.typeGroup,
mimeType=part.mimeType,
data=neutralizedData,
metadata=part.metadata.copy() if part.metadata else {}
)
neutralizedParts.append(neutralizedPart)
else:
# Neutralization failed, use original part
logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
neutralizedParts.append(part)
except Exception as e:
logger.error(f"Error neutralizing part {part.id}: {str(e)}")
# On error, use original part
neutralizedParts.append(part)
else:
# No data to neutralize, keep original part
neutralizedParts.append(part)
# Create neutralized ContentExtracted object
neutralizedContentExtracted = ContentExtracted(
id=contentExtracted.id,
parts=neutralizedParts,
summary=contentExtracted.summary
)
# Create ActionDocument
originalFileName = getattr(chatDoc, 'fileName', f"document_{i+1}.json")
baseName = originalFileName.rsplit('.', 1)[0] if '.' in originalFileName else originalFileName
documentName = f"{baseName}_neutralized_{contentExtracted.id}.json"
validationMetadata = {
"actionType": "context.neutralizeData",
"documentIndex": i,
"extractedId": contentExtracted.id,
"partCount": len(neutralizedParts),
"neutralized": True,
"originalFileName": originalFileName
}
actionDoc = ActionDocument(
documentName=documentName,
documentData=neutralizedContentExtracted,
mimeType="application/json",
validationMetadata=validationMetadata
)
actionDocuments.append(actionDoc)
except Exception as e:
logger.error(f"Error processing document {i+1}: {str(e)}")
# Continue with other documents
continue
if not actionDocuments:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid ContentExtracted documents found to neutralize")
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=actionDocuments)
except Exception as e:
logger.error(f"Error in data neutralization: {str(e)}")
# Complete progress tracking with failure
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
return ActionResult.isFailure(error=str(e))

View file

@ -13,6 +13,7 @@ from .helpers.formatting import FormattingHelper
# Import actions
from .actions.getDocumentIndex import getDocumentIndex
from .actions.extractContent import extractContent
from .actions.neutralizeData import neutralizeData
from .actions.triggerPreprocessingServer import triggerPreprocessingServer
logger = logging.getLogger(__name__)
@ -68,6 +69,20 @@ class MethodContext(MethodBase):
},
execute=extractContent.__get__(self, self.__class__)
),
"neutralizeData": WorkflowActionDefinition(
actionId="context.neutralizeData",
description="Neutralize extracted data from ContentExtracted documents (for use after extractContent)",
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
type="List[str]",
frontendType=FrontendType.DOCUMENT_REFERENCE,
required=True,
description="Document reference(s) containing ContentExtracted objects to neutralize"
)
},
execute=neutralizeData.__get__(self, self.__class__)
),
"triggerPreprocessingServer": WorkflowActionDefinition(
actionId="context.triggerPreprocessingServer",
description="Trigger preprocessing server at customer tenant to update database with configuration",
@ -104,5 +119,6 @@ class MethodContext(MethodBase):
# Register actions as methods (optional, für direkten Zugriff)
self.getDocumentIndex = getDocumentIndex.__get__(self, self.__class__)
self.extractContent = extractContent.__get__(self, self.__class__)
self.neutralizeData = neutralizeData.__get__(self, self.__class__)
self.triggerPreprocessingServer = triggerPreprocessingServer.__get__(self, self.__class__)

View file

@ -960,17 +960,13 @@ class WorkflowManager:
async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]:
"""Process file IDs from existing files and return ChatDocument objects.
If neutralization is enabled, files are neutralized and new files are created with neutralized content.
If neutralization fails, the document is not included and an error is logged to ChatLog."""
documents = []
# Check if neutralization is enabled
neutralizationEnabled = False
try:
config = self.services.neutralization.getConfig()
neutralizationEnabled = config and config.enabled
except Exception as e:
logger.debug(f"Could not check neutralization config: {str(e)}")
NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization
should happen AFTER content extraction (in extractContent action) to neutralize
extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization
happens after extraction but before AI processing.
"""
documents = []
workflow = self.services.workflow
@ -984,87 +980,23 @@ class WorkflowManager:
originalFileName = fileInfo.get("fileName", "unknown")
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
fileIdToUse = fileId
fileNameToUse = originalFileName
fileSizeToUse = fileInfo.get("size", 0)
neutralizationFailed = False
# Neutralize file if enabled
if neutralizationEnabled:
try:
# Neutralize the file using the neutralization service
neutralizationResult = self.services.neutralization.processFile(fileId)
# Check if file is binary (not neutralized)
if neutralizationResult.get('is_binary', False):
# Binary file - log INFO and use original file
infoMsg = f"File '{originalFileName}' (MIME type: {neutralizationResult.get('mime_type', 'unknown')}) is a binary file. Binary file neutralization will be implemented in the future. Using original file without neutralization."
logger.info(infoMsg)
self.services.chat.storeLog(workflow, {
"message": infoMsg,
"type": "info",
"status": "running",
"progress": 50
})
# Use original file (fileIdToUse already set to fileId)
elif neutralizationResult and 'neutralized_text' in neutralizationResult:
neutralizedText = neutralizationResult['neutralized_text']
# Create new file with neutralized content
neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}")
neutralizedContentBytes = neutralizedText.encode('utf-8')
# Create file in component storage
neutralizedFileItem = self.services.interfaceDbComponent.createFile(
name=neutralizedFileName,
mimeType=originalMimeType,
content=neutralizedContentBytes
)
# Persist file data
self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes)
# Use the neutralized file ID and actual size
fileIdToUse = neutralizedFileItem.id
fileNameToUse = neutralizedFileName
fileSizeToUse = len(neutralizedContentBytes)
logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})")
else:
neutralizationFailed = True
errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})"
logger.warning(errorMsg)
self.services.chat.storeLog(workflow, {
"message": errorMsg,
"type": "error",
"status": "error",
"progress": -1
})
except Exception as e:
neutralizationFailed = True
errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}"
logger.error(errorMsg)
self.services.chat.storeLog(workflow, {
"message": errorMsg,
"type": "error",
"status": "error",
"progress": -1
})
# NOTE: Neutralization removed from here - it should happen in extractContent action
# after content extraction but before AI processing (for dynamic workflows)
# This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments
# Only skip document if neutralization failed (not for binary files)
if not neutralizationFailed:
# Create document with file ID (neutralized or original)
document = ChatDocument(
id=str(uuid.uuid4()),
messageId=messageId or "",
fileId=fileIdToUse,
fileName=fileNameToUse,
fileSize=fileSizeToUse,
mimeType=originalMimeType
)
documents.append(document)
logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})")
else:
logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure")
# Create document with original file ID (no neutralization)
document = ChatDocument(
id=str(uuid.uuid4()),
messageId=messageId or "",
fileId=fileId,
fileName=originalFileName,
fileSize=fileSizeToUse,
mimeType=originalMimeType
)
documents.append(document)
logger.info(f"Processed file ID {fileId} -> {document.fileName}")
except Exception as e:
errorMsg = f"Error processing file ID {fileId}: {str(e)}"
logger.error(errorMsg)