235 lines
10 KiB
Python
235 lines
10 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
|
|
import logging
|
|
import json
|
|
import requests
|
|
from typing import Dict, Any
|
|
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def searchEmails(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
try:
|
|
connectionReference = parameters.get("connectionReference")
|
|
query = parameters.get("query")
|
|
folder = parameters.get("folder", "All")
|
|
limit = parameters.get("limit", 1000)
|
|
outputMimeType = parameters.get("outputMimeType", "application/json")
|
|
|
|
# Validate parameters
|
|
if not connectionReference:
|
|
return ActionResult.isFailure(error="Connection reference is required")
|
|
|
|
# Validate limit parameter
|
|
if limit <= 0:
|
|
limit = 1000
|
|
logger.warning(f"Invalid limit value ({limit}), using default value 1000")
|
|
|
|
if not query or not query.strip():
|
|
return ActionResult.isFailure(error="Search query is required and cannot be empty")
|
|
|
|
# Check if this is a folder specification query
|
|
if query.strip().lower().startswith('folder:'):
|
|
folder_name = query.strip()[7:].strip() # Remove "folder:" prefix
|
|
if not folder_name:
|
|
return ActionResult.isFailure(error="Invalid folder specification. Use format 'folder:FolderName'")
|
|
logger.info(f"Search query is a folder specification: {folder_name}")
|
|
|
|
# Validate limit
|
|
try:
|
|
limit = int(limit)
|
|
if limit <= 0:
|
|
limit = 1000
|
|
logger.warning(f"Invalid limit value (<=0), using default value 1000")
|
|
elif limit > 1000: # Microsoft Graph API has limits
|
|
limit = 1000
|
|
logger.warning(f"Limit {limit} exceeds maximum (1000), using 1000")
|
|
except (ValueError, TypeError):
|
|
limit = 1000
|
|
logger.warning(f"Invalid limit value, using default value 1000")
|
|
|
|
# Get Microsoft connection
|
|
connection = self.connection.getMicrosoftConnection(connectionReference)
|
|
if not connection:
|
|
return ActionResult.isFailure(error="No valid Microsoft connection found for the provided connection reference")
|
|
|
|
# Search emails using Microsoft Graph API
|
|
try:
|
|
# Microsoft Graph API endpoint for searching messages
|
|
graph_url = "https://graph.microsoft.com/v1.0"
|
|
headers = {
|
|
"Authorization": f"Bearer {connection['accessToken']}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Get the folder ID for the specified folder if needed
|
|
folder_id = None
|
|
if folder and folder.lower() != "all":
|
|
folder_id = self.folderManagement.getFolderId(folder, connection)
|
|
if folder_id:
|
|
logger.debug(f"Found folder ID for '{folder}': {folder_id}")
|
|
else:
|
|
logger.warning(f"Could not find folder ID for '{folder}', using folder name directly")
|
|
|
|
# Build the search API request
|
|
api_url = f"{graph_url}/me/messages"
|
|
params = self.emailProcessing.buildSearchParameters(query, folder_id or folder, limit)
|
|
|
|
# Log search parameters for debugging
|
|
logger.debug(f"Search query: '{query}'")
|
|
logger.debug(f"Search folder: '{folder}'")
|
|
logger.debug(f"Search parameters: {params}")
|
|
logger.debug(f"API URL: {api_url}")
|
|
|
|
# Make the API call
|
|
response = requests.get(api_url, headers=headers, params=params)
|
|
|
|
# Log response details for debugging
|
|
|
|
|
|
if response.status_code != 200:
|
|
# Log detailed error information
|
|
try:
|
|
error_data = response.json()
|
|
logger.error(f"Microsoft Graph API error: {response.status_code} - {error_data}")
|
|
except Exception:
|
|
logger.error(f"Microsoft Graph API error: {response.status_code} - {response.text}")
|
|
|
|
# Check for specific error types and provide helpful messages
|
|
if response.status_code == 400:
|
|
logger.error("Bad Request (400) - Check search query format and parameters")
|
|
logger.error(f"Search query: '{query}'")
|
|
logger.error(f"Search parameters: {params}")
|
|
logger.error(f"API URL: {api_url}")
|
|
elif response.status_code == 401:
|
|
logger.error("Unauthorized (401) - Check access token and permissions")
|
|
elif response.status_code == 403:
|
|
logger.error("Forbidden (403) - Check API permissions and scopes")
|
|
elif response.status_code == 429:
|
|
logger.error("Too Many Requests (429) - Rate limit exceeded")
|
|
|
|
raise Exception(f"Microsoft Graph API returned {response.status_code}: {response.text}")
|
|
|
|
search_data = response.json()
|
|
emails = search_data.get("value", [])
|
|
|
|
|
|
|
|
# Apply folder filtering if needed and we used $search
|
|
if folder and folder.lower() != "all" and "$search" in params:
|
|
# Get the actual folder ID for proper filtering
|
|
folder_id = self.folderManagement.getFolderId(folder, connection)
|
|
|
|
if folder_id:
|
|
# Filter results by folder ID
|
|
filtered_emails = []
|
|
for email in emails:
|
|
if email.get("parentFolderId") == folder_id:
|
|
filtered_emails.append(email)
|
|
emails = filtered_emails
|
|
logger.debug(f"Applied folder filtering: {len(filtered_emails)} emails found in folder {folder}")
|
|
else:
|
|
# Fallback: try to filter by folder name (less reliable)
|
|
filtered_emails = []
|
|
for email in emails:
|
|
# Check if email has folder information
|
|
if hasattr(email, 'parentFolderId') and email.get('parentFolderId'):
|
|
if email.get('parentFolderId') == folder:
|
|
filtered_emails.append(email)
|
|
else:
|
|
# If no folder info, include the email (less strict filtering)
|
|
filtered_emails.append(email)
|
|
|
|
emails = filtered_emails
|
|
logger.debug(f"Applied fallback folder filtering: {len(filtered_emails)} emails found in folder {folder}")
|
|
|
|
# Special handling for folder specification queries
|
|
if query.strip().lower().startswith('folder:'):
|
|
folder_name = query.strip()[7:].strip()
|
|
folder_id = self.folderManagement.getFolderId(folder_name, connection)
|
|
if folder_id:
|
|
# Filter results to only include emails from the specified folder
|
|
filtered_emails = []
|
|
for email in emails:
|
|
if email.get("parentFolderId") == folder_id:
|
|
filtered_emails.append(email)
|
|
emails = filtered_emails
|
|
logger.debug(f"Applied folder specification filtering: {len(filtered_emails)} emails found in folder {folder_name}")
|
|
else:
|
|
logger.warning(f"Could not find folder ID for folder specification: {folder_name}")
|
|
|
|
|
|
search_result = {
|
|
"query": query,
|
|
"results": emails,
|
|
"count": len(emails),
|
|
"folder": folder,
|
|
"limit": limit,
|
|
"apiMetadata": {
|
|
"@odata.context": search_data.get("@odata.context"),
|
|
"@odata.count": search_data.get("@odata.count"),
|
|
"@odata.nextLink": search_data.get("@odata.nextLink")
|
|
},
|
|
"searchParams": params
|
|
}
|
|
|
|
|
|
|
|
except ImportError:
|
|
logger.error("requests module not available")
|
|
return ActionResult.isFailure(error="requests module not available")
|
|
except Exception as e:
|
|
logger.error(f"Error searching emails via Microsoft Graph API: {str(e)}")
|
|
return ActionResult.isFailure(error=f"Failed to search emails: {str(e)}")
|
|
|
|
# Determine output format based on MIME type
|
|
mime_type_mapping = {
|
|
"application/json": ".json",
|
|
"text/plain": ".txt",
|
|
"text/csv": ".csv"
|
|
}
|
|
output_extension = mime_type_mapping.get(outputMimeType, ".json")
|
|
output_mime_type = outputMimeType
|
|
logger.info(f"Using output format: {output_extension} ({output_mime_type})")
|
|
|
|
|
|
|
|
result_data = {
|
|
"connectionReference": connectionReference,
|
|
"query": query,
|
|
"folder": folder,
|
|
"limit": limit,
|
|
"searchResults": search_result,
|
|
"connection": {
|
|
"id": connection["id"],
|
|
"authority": "microsoft",
|
|
"reference": connectionReference
|
|
},
|
|
"timestamp": self.services.utils.timestampGetUtc()
|
|
}
|
|
|
|
validationMetadata = {
|
|
"actionType": "outlook.searchEmails",
|
|
"connectionReference": connectionReference,
|
|
"query": query,
|
|
"folder": folder,
|
|
"limit": limit,
|
|
"resultCount": search_result.get("count", 0),
|
|
"outputMimeType": outputMimeType
|
|
}
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[ActionDocument(
|
|
documentName=f"outlook_email_search_{self._format_timestamp_for_filename()}.json",
|
|
documentData=json.dumps(result_data, indent=2),
|
|
mimeType="application/json",
|
|
validationMetadata=validationMetadata
|
|
)]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching emails: {str(e)}")
|
|
return ActionResult.isFailure(error=str(e))
|
|
|