From e11ab4ebc560ae7b799042b5e9bb0d346565d236 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 4 Nov 2025 15:50:16 +0100
Subject: [PATCH] Integrated neutralizer to MVP
---
modules/interfaces/interfaceDbChatObjects.py | 14 +-
.../mainServiceNeutralization.py | 11 +-
modules/workflows/methods/methodOutlook.py | 308 ++++++++++++------
modules/workflows/workflowManager.py | 103 +++++-
4 files changed, 314 insertions(+), 122 deletions(-)
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index 08d0ee88..46fd4ba0 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -1230,9 +1230,12 @@ class ChatObjects:
allAutomations = self.db.getRecordset(AutomationDefinition)
filteredAutomations = self._uam(AutomationDefinition, allAutomations)
- # Compute status for each automation
+ # Compute status for each automation and normalize executionLogs
for automation in filteredAutomations:
automation["status"] = self._computeAutomationStatus(automation)
+ # Ensure executionLogs is always a list, not None
+ if automation.get("executionLogs") is None:
+ automation["executionLogs"] = []
# If no pagination requested, return all items
if pagination is None:
@@ -1272,6 +1275,9 @@ class ChatObjects:
automation = filtered[0]
automation["status"] = self._computeAutomationStatus(automation)
+ # Ensure executionLogs is always a list, not None
+ if automation.get("executionLogs") is None:
+ automation["executionLogs"] = []
return automation
except Exception as e:
logger.error(f"Error getting automation definition: {str(e)}")
@@ -1306,6 +1312,9 @@ class ChatObjects:
# Compute status
createdAutomation["status"] = self._computeAutomationStatus(createdAutomation)
+ # Ensure executionLogs is always a list, not None
+ if createdAutomation.get("executionLogs") is None:
+ createdAutomation["executionLogs"] = []
# Trigger sync (async, don't wait)
asyncio.create_task(self.syncAutomationEvents())
@@ -1334,6 +1343,9 @@ class ChatObjects:
# Compute status
updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation)
+ # Ensure executionLogs is always a list, not None
+ if updatedAutomation.get("executionLogs") is None:
+ updatedAutomation["executionLogs"] = []
# Trigger sync (async, don't wait)
asyncio.create_task(self.syncAutomationEvents())
diff --git a/modules/services/serviceNeutralization/mainServiceNeutralization.py b/modules/services/serviceNeutralization/mainServiceNeutralization.py
index 29644903..e475f1b3 100644
--- a/modules/services/serviceNeutralization/mainServiceNeutralization.py
+++ b/modules/services/serviceNeutralization/mainServiceNeutralization.py
@@ -34,6 +34,7 @@ class NeutralizationService:
"""
self.services = serviceCenter
self.interfaceDbApp = serviceCenter.interfaceDbApp
+ self.interfaceDbComponent = serviceCenter.interfaceDbComponent
# Initialize anonymization processors
self.NamesToParse = NamesToParse or []
@@ -61,19 +62,19 @@ class NeutralizationService:
return self._neutralizeText(text, 'text')
def processFile(self, fileId: str) -> Dict[str, Any]:
- """Neutralize a file referenced by its fileId using app interface."""
- if not self.interfaceDbApp:
- raise ValueError("User context is required to process a file by fileId")
+ """Neutralize a file referenced by its fileId using component interface."""
+ if not self.interfaceDbComponent:
+ raise ValueError("Component interface is required to process a file by fileId")
# Fetch file data and metadata
fileInfo = None
try:
# getFile returns an object; fallback to dict-like
- fileInfo = self.interfaceDbApp.getFile(fileId)
+ fileInfo = self.interfaceDbComponent.getFile(fileId)
except Exception:
fileInfo = None
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
- fileData = self.interfaceDbApp.getFileData(fileId)
+ fileData = self.interfaceDbComponent.getFileData(fileId)
if not fileData:
raise ValueError(f"No file data found for fileId: {fileId}")
diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
index 0db25e74..4539452b 100644
--- a/modules/workflows/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -1337,7 +1337,7 @@ Return JSON:
draft_id = draft_data.get("id", "Unknown")
# Create draft result data with full draft information
- draft_result_data = {
+ draftResultData = {
"status": "draft",
"message": "Email draft created successfully with AI-generated content",
"draftId": draft_id,
@@ -1361,7 +1361,7 @@ Return JSON:
success=True,
documents=[ActionDocument(
documentName=f"ai_generated_email_draft_{self._format_timestamp_for_filename()}.json",
- documentData=json.dumps(draft_result_data, indent=2),
+ documentData=json.dumps(draftResultData, indent=2),
mimeType="application/json"
)]
)
@@ -1381,47 +1381,27 @@ Return JSON:
async def sendDraftEmail(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- - Purpose: Send a draft email using the draft email JSON data from action outlook.composeAndDraftEmailWithContext. This action is used to send the email after the email has been composed and drafted.
- - Input requirements: connectionReference (required); draftEmailJson (required).
- - Output format: JSON confirmation with sent mail metadata.
+ - Purpose: Send draft email(s) using draft email JSON document(s) from action outlook.composeAndDraftEmailWithContext.
+ - Input requirements: connectionReference (required); documentList with draft email JSON documents (required).
+ - Output format: JSON confirmation with sent mail metadata for all emails.
Parameters:
- connectionReference (str, required): Microsoft connection label.
- - draftEmailJson (str or dict, required): Draft email JSON data containing draftId or draftData with id field.
+ - documentList (list, required): Document reference(s) to draft emails in JSON format (outputs from outlook.composeAndDraftEmailWithContext function).
"""
try:
connectionReference = parameters.get("connectionReference")
- draftEmailJson = parameters.get("draftEmailJson")
+ documentList = parameters.get("documentList", [])
if not connectionReference:
return ActionResult.isFailure(error="Connection reference is required")
- if not draftEmailJson:
- return ActionResult.isFailure(error="Draft email JSON is required")
+ if not documentList:
+ return ActionResult.isFailure(error="documentList is required and cannot be empty")
- # Parse draft email JSON if it's a string
- if isinstance(draftEmailJson, str):
- try:
- draftEmailJson = json.loads(draftEmailJson)
- except json.JSONDecodeError:
- return ActionResult.isFailure(error="Invalid JSON format in draftEmailJson parameter")
-
- # Extract draft ID from the JSON
- draft_id = None
- if isinstance(draftEmailJson, dict):
- # Try to get draftId directly
- draft_id = draftEmailJson.get("draftId")
- # If not found, try to get it from draftData
- if not draft_id and "draftData" in draftEmailJson:
- draft_data = draftEmailJson.get("draftData")
- if isinstance(draft_data, dict):
- draft_id = draft_data.get("id")
- # If still not found, try id field directly
- if not draft_id:
- draft_id = draftEmailJson.get("id")
-
- if not draft_id:
- return ActionResult.isFailure(error="Could not extract draft ID from draftEmailJson. Ensure it contains 'draftId' or 'draftData.id' field")
+ # Convert single value to list if needed
+ if isinstance(documentList, str):
+ documentList = [documentList]
# Get Microsoft connection
connection = self._getMicrosoftConnection(connectionReference)
@@ -1433,81 +1413,199 @@ Return JSON:
if not permissions_ok:
return ActionResult.isFailure(error="Connection lacks necessary permissions for Outlook operations")
- # Send the draft email
- try:
- graph_url = "https://graph.microsoft.com/v1.0"
- headers = {
- "Authorization": f"Bearer {connection['accessToken']}",
- "Content-Type": "application/json"
- }
-
- send_url = f"{graph_url}/me/messages/{draft_id}/send"
- send_response = requests.post(send_url, headers=headers)
-
- # Extract email details from draft JSON for confirmation
- subject = draftEmailJson.get("subject", "Unknown")
- recipients = draftEmailJson.get("recipients", [])
- cc = draftEmailJson.get("cc", [])
- bcc = draftEmailJson.get("bcc", [])
- attachments_count = draftEmailJson.get("attachments", 0)
-
- if send_response.status_code in [200, 202, 204]:
- sent_confirmation_data = {
- "status": "sent",
- "message": "Email sent successfully",
- "draftId": draft_id,
- "subject": subject,
- "recipients": recipients,
- "cc": cc,
- "bcc": bcc,
- "attachments": attachments_count,
- "sentTimestamp": self.services.utils.timestampGetUtc(),
- "confirmation": "Email has been successfully sent to recipients"
- }
+ # Read draft email JSON documents from documentList
+ draftEmails = []
+ for docRef in documentList:
+ try:
+ # Get documents from document reference
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList([docRef])
+ if not chatDocuments:
+ logger.warning(f"No documents found for reference: {docRef}")
+ continue
- logger.info(f"Email sent successfully. Draft ID: {draft_id}")
-
- return ActionResult(
- success=True,
- documents=[ActionDocument(
- documentName=f"sent_mail_confirmation_{self._format_timestamp_for_filename()}.json",
- documentData=json.dumps(sent_confirmation_data, indent=2),
- mimeType="application/json"
- )]
- )
- else:
- logger.error(f"Failed to send email. Status: {send_response.status_code}, Response: {send_response.text}")
-
- sent_confirmation_data = {
- "status": "error",
- "message": "Failed to send draft email",
- "draftId": draft_id,
- "subject": subject,
- "recipients": recipients,
- "sendError": {
- "statusCode": send_response.status_code,
- "response": send_response.text
- },
- "sentTimestamp": self.services.utils.timestampGetUtc(),
- "confirmation": "Email draft sending failed"
- }
-
- return ActionResult.isFailure(
- error=f"Failed to send email: {send_response.status_code} - {send_response.text}",
- documents=[ActionDocument(
- documentName=f"sent_mail_confirmation_{self._format_timestamp_for_filename()}.json",
- documentData=json.dumps(sent_confirmation_data, indent=2),
- mimeType="application/json"
- )]
- )
-
- except ImportError:
- logger.error("requests module not available")
- return ActionResult.isFailure(error="requests module not available")
- except Exception as e:
- logger.error(f"Error sending draft email via Microsoft Graph API: {str(e)}")
- return ActionResult.isFailure(error=f"Failed to send draft email: {str(e)}")
+ # Process each document in the reference
+ for doc in chatDocuments:
+ try:
+ # Read file data
+ fileId = getattr(doc, 'fileId', None)
+ if not fileId:
+ logger.warning(f"Document {doc.fileName} has no fileId")
+ continue
+
+ fileData = self.services.chat.getFileData(fileId)
+ if not fileData:
+ logger.warning(f"No file data found for document: {doc.fileName}")
+ continue
+
+ # Parse JSON content
+ if isinstance(fileData, bytes):
+ jsonContent = fileData.decode('utf-8')
+ else:
+ jsonContent = str(fileData)
+
+ # Parse JSON - handle both direct JSON and JSON wrapped in documentData
+ try:
+ draftEmailData = json.loads(jsonContent)
+
+ # If the JSON contains a 'documentData' field, extract it
+ if isinstance(draftEmailData, dict) and 'documentData' in draftEmailData:
+ documentDataStr = draftEmailData['documentData']
+ if isinstance(documentDataStr, str):
+ draftEmailData = json.loads(documentDataStr)
+
+ # Validate draft email structure
+ if not isinstance(draftEmailData, dict):
+ logger.warning(f"Document {doc.fileName} does not contain a valid draft email JSON object")
+ continue
+
+ draftId = draftEmailData.get("draftId")
+ if not draftId:
+ logger.warning(f"Document {doc.fileName} does not contain 'draftId' field")
+ continue
+
+ draftEmails.append({
+ "draftEmailJson": draftEmailData,
+ "draftId": draftId,
+ "sourceDocument": doc.fileName,
+ "sourceReference": docRef
+ })
+
+ except json.JSONDecodeError as e:
+ logger.error(f"Failed to parse JSON from document {doc.fileName}: {str(e)}")
+ continue
+
+ except Exception as e:
+ logger.error(f"Error processing document {doc.fileName}: {str(e)}")
+ continue
+
+ except Exception as e:
+ logger.error(f"Error reading documents from reference {docRef}: {str(e)}")
+ continue
+ if not draftEmails:
+ return ActionResult.isFailure(error="No valid draft email JSON documents found in documentList")
+
+ # Send all draft emails
+ graph_url = "https://graph.microsoft.com/v1.0"
+ headers = {
+ "Authorization": f"Bearer {connection['accessToken']}",
+ "Content-Type": "application/json"
+ }
+
+ sentResults = []
+ failedResults = []
+
+ for draftEmail in draftEmails:
+ draftEmailJson = draftEmail["draftEmailJson"]
+ draftId = draftEmail["draftId"]
+ sourceDocument = draftEmail["sourceDocument"]
+
+ try:
+ send_url = f"{graph_url}/me/messages/{draftId}/send"
+ sendResponse = requests.post(send_url, headers=headers)
+
+ # Extract email details from draft JSON for confirmation
+ subject = draftEmailJson.get("subject", "Unknown")
+ recipients = draftEmailJson.get("recipients", [])
+ cc = draftEmailJson.get("cc", [])
+ bcc = draftEmailJson.get("bcc", [])
+ attachmentsCount = draftEmailJson.get("attachments", 0)
+
+ if sendResponse.status_code in [200, 202, 204]:
+ sentResults.append({
+ "status": "sent",
+ "message": "Email sent successfully",
+ "draftId": draftId,
+ "subject": subject,
+ "recipients": recipients,
+ "cc": cc,
+ "bcc": bcc,
+ "attachments": attachmentsCount,
+ "sentTimestamp": self.services.utils.timestampGetUtc(),
+ "sourceDocument": sourceDocument
+ })
+ logger.info(f"Email sent successfully. Draft ID: {draftId}, Subject: {subject}")
+ else:
+ errorResult = {
+ "status": "error",
+ "message": "Failed to send draft email",
+ "draftId": draftId,
+ "subject": subject,
+ "recipients": recipients,
+ "sendError": {
+ "statusCode": sendResponse.status_code,
+ "response": sendResponse.text
+ },
+ "sentTimestamp": self.services.utils.timestampGetUtc(),
+ "sourceDocument": sourceDocument
+ }
+ failedResults.append(errorResult)
+ logger.error(f"Failed to send email. Draft ID: {draftId}, Status: {sendResponse.status_code}, Response: {sendResponse.text}")
+
+ except Exception as e:
+ errorResult = {
+ "status": "error",
+ "message": f"Exception while sending draft email: {str(e)}",
+ "draftId": draftId,
+ "subject": draftEmailJson.get("subject", "Unknown"),
+ "recipients": draftEmailJson.get("recipients", []),
+ "exception": str(e),
+ "sentTimestamp": self.services.utils.timestampGetUtc(),
+ "sourceDocument": sourceDocument
+ }
+ failedResults.append(errorResult)
+ logger.error(f"Error sending draft email {draftId}: {str(e)}")
+
+ # Build result summary
+ totalEmails = len(draftEmails)
+ successfulEmails = len(sentResults)
+ failedEmails = len(failedResults)
+
+ resultData = {
+ "totalEmails": totalEmails,
+ "successfulEmails": successfulEmails,
+ "failedEmails": failedEmails,
+ "sentResults": sentResults,
+ "failedResults": failedResults,
+ "timestamp": self.services.utils.timestampGetUtc()
+ }
+
+ # Determine overall success status
+ if successfulEmails == 0:
+ return ActionResult.isFailure(
+ error=f"Failed to send all {totalEmails} email(s)",
+ documents=[ActionDocument(
+ documentName=f"sent_mail_confirmation_{self._format_timestamp_for_filename()}.json",
+ documentData=json.dumps(resultData, indent=2),
+ mimeType="application/json"
+ )]
+ )
+ elif failedEmails > 0:
+ # Partial success
+ logger.warning(f"Sent {successfulEmails} out of {totalEmails} emails. {failedEmails} failed.")
+ return ActionResult(
+ success=True,
+ documents=[ActionDocument(
+ documentName=f"sent_mail_confirmation_{self._format_timestamp_for_filename()}.json",
+ documentData=json.dumps(resultData, indent=2),
+ mimeType="application/json"
+ )]
+ )
+ else:
+ # All successful
+ logger.info(f"Successfully sent all {totalEmails} email(s)")
+ return ActionResult(
+ success=True,
+ documents=[ActionDocument(
+ documentName=f"sent_mail_confirmation_{self._format_timestamp_for_filename()}.json",
+ documentData=json.dumps(resultData, indent=2),
+ mimeType="application/json"
+ )]
+ )
+
+ except ImportError:
+ logger.error("requests module not available")
+ return ActionResult.isFailure(error="requests module not available")
except Exception as e:
logger.error(f"Error in sendDraftEmail: {str(e)}")
return ActionResult.isFailure(error=str(e))
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 03ce79c4..87b9b599 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -721,28 +721,109 @@ class WorkflowManager:
raise
async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]:
- """Process file IDs from existing files and return ChatDocument objects"""
+ """Process file IDs from existing files and return ChatDocument objects.
+ If neutralization is enabled, files are neutralized and new files are created with neutralized content.
+ If neutralization fails, the document is not included and an error is logged to ChatLog."""
documents = []
+
+ # Check if neutralization is enabled
+ neutralizationEnabled = False
+ try:
+ config = self.services.neutralization.getConfig()
+ neutralizationEnabled = config and config.enabled
+ except Exception as e:
+ logger.debug(f"Could not check neutralization config: {str(e)}")
+
+ workflow = self.services.workflow
+
for fileId in fileIds:
try:
# Get file info from chat service
fileInfo = self.services.chat.getFileInfo(fileId)
- if fileInfo:
- # Create document directly with all file attributes
+ if not fileInfo:
+ logger.warning(f"No file info found for file ID {fileId}")
+ continue
+
+ originalFileName = fileInfo.get("fileName", "unknown")
+ originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
+ fileIdToUse = fileId
+ fileNameToUse = originalFileName
+ fileSizeToUse = fileInfo.get("size", 0)
+ neutralizationFailed = False
+
+ # Neutralize file if enabled
+ if neutralizationEnabled:
+ try:
+ # Neutralize the file using the neutralization service
+ neutralizationResult = self.services.neutralization.processFile(fileId)
+
+ if neutralizationResult and 'neutralized_text' in neutralizationResult:
+ neutralizedText = neutralizationResult['neutralized_text']
+
+ # Create new file with neutralized content
+ neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}")
+ neutralizedContentBytes = neutralizedText.encode('utf-8')
+
+ # Create file in component storage
+ neutralizedFileItem = self.services.interfaceDbComponent.createFile(
+ name=neutralizedFileName,
+ mimeType=originalMimeType,
+ content=neutralizedContentBytes
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes)
+
+ # Use the neutralized file ID and actual size
+ fileIdToUse = neutralizedFileItem.id
+ fileNameToUse = neutralizedFileName
+ fileSizeToUse = len(neutralizedContentBytes)
+
+ logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})")
+ else:
+ neutralizationFailed = True
+ errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})"
+ logger.warning(errorMsg)
+ self.services.chat.storeLog(workflow, {
+ "message": errorMsg,
+ "type": "error",
+ "status": "error",
+ "progress": -1
+ })
+ except Exception as e:
+ neutralizationFailed = True
+ errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}"
+ logger.error(errorMsg)
+ self.services.chat.storeLog(workflow, {
+ "message": errorMsg,
+ "type": "error",
+ "status": "error",
+ "progress": -1
+ })
+
+ # Only add document if neutralization didn't fail (or if neutralization is disabled)
+ if not neutralizationFailed:
+ # Create document with file ID (neutralized or original)
document = ChatDocument(
id=str(uuid.uuid4()),
- messageId=messageId or "", # Use provided messageId or empty string as fallback
- fileId=fileId,
- fileName=fileInfo.get("fileName", "unknown"),
- fileSize=fileInfo.get("size", 0),
- mimeType=fileInfo.get("mimeType", "application/octet-stream")
+ messageId=messageId or "",
+ fileId=fileIdToUse,
+ fileName=fileNameToUse,
+ fileSize=fileSizeToUse,
+ mimeType=originalMimeType
)
documents.append(document)
- logger.info(f"Processed file ID {fileId} -> {document.fileName}")
+ logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})")
else:
- logger.warning(f"No file info found for file ID {fileId}")
+ logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure")
except Exception as e:
- logger.error(f"Error processing file ID {fileId}: {str(e)}")
+ errorMsg = f"Error processing file ID {fileId}: {str(e)}"
+ logger.error(errorMsg)
+ self.services.chat.storeLog(workflow, {
+ "message": errorMsg,
+ "type": "error",
+ "status": "error",
+ "progress": -1
+ })
return documents