gateway/modules/workflows/methods/methodContext/actions/neutralizeData.py
2025-12-17 22:03:58 +01:00

256 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Neutralize Data action for Context operations.
Neutralizes extracted content data from ContentExtracted documents.
"""
import logging
import time
from typing import Dict, Any
from modules.workflows.methods.methodBase import action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
logger = logging.getLogger(__name__)
@action
async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Neutralize data from ContentExtracted documents.
This action takes documents containing ContentExtracted objects (from extractContent)
and neutralizes the text data in ContentPart.data fields.
Parameters:
- documentList (list, required): Document reference(s) containing ContentExtracted objects.
Returns:
- ActionResult with ActionDocument containing neutralized ContentExtracted objects
"""
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"context_neutralize_{workflowId}_{int(time.time())}"
# Check if neutralization is enabled
neutralizationEnabled = False
try:
config = self.services.neutralization.getConfig()
neutralizationEnabled = config and config.enabled
except Exception as e:
logger.debug(f"Could not check neutralization config: {str(e)}")
if not neutralizationEnabled:
logger.info("Neutralization is not enabled, returning documents unchanged")
# Return original documents if neutralization is disabled
# Get documents from documentList
documentListParam = parameters.get("documentList")
if not documentListParam:
return ActionResult.isFailure(error="documentList is required")
# Convert to DocumentReferenceList if needed
if isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
# Get ChatDocuments from documentList
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found in documentList")
# Return original documents as ActionDocuments
actionDocuments = []
for chatDoc in chatDocuments:
# Extract ContentExtracted from documentData if available
if hasattr(chatDoc, 'documentData') and chatDoc.documentData:
actionDoc = ActionDocument(
documentName=getattr(chatDoc, 'fileName', 'unknown'),
documentData=chatDoc.documentData,
mimeType=getattr(chatDoc, 'mimeType', 'application/json'),
validationMetadata={
"actionType": "context.neutralizeData",
"neutralized": False,
"reason": "Neutralization disabled"
}
)
actionDocuments.append(actionDoc)
return ActionResult.isSuccess(documents=actionDocuments)
# Extract documentList from parameters dict
documentListParam = parameters.get("documentList")
if not documentListParam:
return ActionResult.isFailure(error="documentList is required")
# Convert to DocumentReferenceList if needed
if isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}")
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
self.services.chat.progressLogStart(
operationId,
"Neutralizing data from documents",
"Data Neutralization",
f"Documents: {len(documentList.references)}",
parentOperationId=parentOperationId
)
# Get ChatDocuments from documentList
self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents")
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No documents found in documentList")
logger.info(f"Neutralizing data from {len(chatDocuments)} documents")
# Process each document
self.services.chat.progressLogUpdate(operationId, 0.3, "Processing documents")
actionDocuments = []
for i, chatDoc in enumerate(chatDocuments):
try:
# Extract ContentExtracted from documentData
if not hasattr(chatDoc, 'documentData') or not chatDoc.documentData:
logger.warning(f"Document {i+1} has no documentData, skipping")
continue
documentData = chatDoc.documentData
# Check if it's a ContentExtracted object
if isinstance(documentData, ContentExtracted):
contentExtracted = documentData
elif isinstance(documentData, dict):
# Try to parse as ContentExtracted
try:
contentExtracted = ContentExtracted(**documentData)
except Exception as e:
logger.warning(f"Document {i+1} documentData is not ContentExtracted: {str(e)}")
continue
else:
logger.warning(f"Document {i+1} documentData is not ContentExtracted or dict")
continue
# Neutralize each ContentPart's data field
neutralizedParts = []
for part in contentExtracted.parts:
if not isinstance(part, ContentPart):
# Try to parse as ContentPart
if isinstance(part, dict):
try:
part = ContentPart(**part)
except Exception as e:
logger.warning(f"Could not parse ContentPart: {str(e)}")
neutralizedParts.append(part)
continue
else:
neutralizedParts.append(part)
continue
# Neutralize the data field if it contains text
if part.data:
try:
self.services.chat.progressLogUpdate(
operationId,
0.3 + (i / len(chatDocuments)) * 0.6,
f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}"
)
# Call neutralization service
neutralizationResult = self.services.neutralization.processText(part.data)
if neutralizationResult and 'neutralized_text' in neutralizationResult:
# Replace data with neutralized text
neutralizedData = neutralizationResult['neutralized_text']
# Create new ContentPart with neutralized data
neutralizedPart = ContentPart(
id=part.id,
parentId=part.parentId,
label=part.label,
typeGroup=part.typeGroup,
mimeType=part.mimeType,
data=neutralizedData,
metadata=part.metadata.copy() if part.metadata else {}
)
neutralizedParts.append(neutralizedPart)
else:
# Neutralization failed, use original part
logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
neutralizedParts.append(part)
except Exception as e:
logger.error(f"Error neutralizing part {part.id}: {str(e)}")
# On error, use original part
neutralizedParts.append(part)
else:
# No data to neutralize, keep original part
neutralizedParts.append(part)
# Create neutralized ContentExtracted object
neutralizedContentExtracted = ContentExtracted(
id=contentExtracted.id,
parts=neutralizedParts,
summary=contentExtracted.summary
)
# Create ActionDocument
originalFileName = getattr(chatDoc, 'fileName', f"document_{i+1}.json")
baseName = originalFileName.rsplit('.', 1)[0] if '.' in originalFileName else originalFileName
documentName = f"{baseName}_neutralized_{contentExtracted.id}.json"
validationMetadata = {
"actionType": "context.neutralizeData",
"documentIndex": i,
"extractedId": contentExtracted.id,
"partCount": len(neutralizedParts),
"neutralized": True,
"originalFileName": originalFileName
}
actionDoc = ActionDocument(
documentName=documentName,
documentData=neutralizedContentExtracted,
mimeType="application/json",
validationMetadata=validationMetadata
)
actionDocuments.append(actionDoc)
except Exception as e:
logger.error(f"Error processing document {i+1}: {str(e)}")
# Continue with other documents
continue
if not actionDocuments:
self.services.chat.progressLogFinish(operationId, False)
return ActionResult.isFailure(error="No valid ContentExtracted documents found to neutralize")
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=actionDocuments)
except Exception as e:
logger.error(f"Error in data neutralization: {str(e)}")
# Complete progress tracking with failure
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
return ActionResult.isFailure(error=str(e))