"""Operator method implementation for handling collections and AI operations""" from typing import Dict, List, Any, Optional from datetime import datetime, UTC import logging import uuid from modules.workflow.methodBase import MethodBase, ActionResult, action logger = logging.getLogger(__name__) class MethodOperator(MethodBase): """Operator method implementation for data operations""" def __init__(self, serviceContainer: Any): super().__init__(serviceContainer) self.name = "operator" self.description = "Handle data operations like filtering, sorting, and transformation" @action async def filter(self, parameters: Dict[str, Any]) -> ActionResult: """ Filter data based on criteria Parameters: documentList (str): Reference to the document list to filter criteria (Dict[str, Any]): Filter criteria field (str, optional): Field to filter on """ try: documentList = parameters.get("documentList") criteria = parameters.get("criteria") field = parameters.get("field") if not documentList or not criteria: return self._createResult( success=False, data={}, error="Document list reference and criteria are required" ) chatDocuments = self.serviceContainer.getChatDocumentsFromDocumentReference(documentList) if not chatDocuments: return self._createResult( success=False, data={}, error="No documents found for the provided reference" ) # Extract content from all documents all_document_content = [] for chatDocument in chatDocuments: fileId = chatDocument.fileId file_data = self.serviceContainer.getFileData(fileId) file_info = self.serviceContainer.getFileInfo(fileId) if not file_data: logger.warning(f"File data not found for fileId: {fileId}") continue all_document_content.append({ "fileId": fileId, "fileName": file_info.get('name', 'unknown'), "content": file_data }) if not all_document_content: return self._createResult( success=False, data={}, error="No content could be extracted from any documents" ) # Combine all document content for filtering combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join([ f"File: {doc['fileName']}\nContent: {doc['content']}" for doc in all_document_content ]) filter_prompt = f""" Filter the following data based on the specified criteria. Data to filter: {combined_content} Filter criteria: {criteria} Field to filter on: {field or 'All fields'} Please provide: 1. Filtered data that matches the criteria 2. Summary of filtering results 3. Number of items before and after filtering 4. Any data quality insights """ filtered_result = await self.serviceContainer.interfaceAiCalls.callAiTextAdvanced(filter_prompt) result_data = { "documentCount": len(chatDocuments), "criteria": criteria, "field": field, "filteredData": filtered_result, "originalCount": len(all_document_content), "timestamp": datetime.now(UTC).isoformat() } return self._createResult( success=True, data={ "documentName": f"filtered_data_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json", "documentData": result_data } ) except Exception as e: logger.error(f"Error filtering data: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def sort(self, parameters: Dict[str, Any]) -> ActionResult: """ Sort data by specified field Parameters: documentList (str): Reference to the document list to sort field (str): Field to sort by order (str, optional): Sort order (asc/desc, default: "asc") """ try: documentList = parameters.get("documentList") field = parameters.get("field") order = parameters.get("order", "asc") if not documentList or not field: return self._createResult( success=False, data={}, error="Document list reference and field are required" ) # Get documents from reference chatDocuments = self.serviceContainer.getChatDocumentsFromDocumentReference(documentList) if not chatDocuments: return self._createResult( success=False, data={}, error="No documents found for the provided reference" ) # Extract content from all documents all_document_content = [] for chatDocument in chatDocuments: fileId = chatDocument.fileId file_data = self.serviceContainer.getFileData(fileId) file_info = self.serviceContainer.getFileInfo(fileId) if not file_data: logger.warning(f"File data not found for fileId: {fileId}") continue all_document_content.append({ "fileId": fileId, "fileName": file_info.get('name', 'unknown'), "content": file_data }) if not all_document_content: return self._createResult( success=False, data={}, error="No content could be extracted from any documents" ) # Combine all document content for sorting combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join([ f"File: {doc['fileName']}\nContent: {doc['content']}" for doc in all_document_content ]) # Create sorting prompt sort_prompt = f""" Sort the following data by the specified field. Data to sort: {combined_content} Sort field: {field} Sort order: {order} Please provide: 1. Sorted data in the specified order 2. Summary of sorting results 3. Any data insights from the sorting 4. Validation of sort field existence """ # Use AI to perform sorting sorted_result = await self.serviceContainer.interfaceAiCalls.callAiTextAdvanced(sort_prompt) # Create result data result_data = { "documentCount": len(chatDocuments), "field": field, "order": order, "sortedData": sorted_result, "timestamp": datetime.now(UTC).isoformat() } return self._createResult( success=True, data={ "documentName": f"sorted_data_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json", "documentData": result_data } ) except Exception as e: logger.error(f"Error sorting data: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def transform(self, parameters: Dict[str, Any]) -> ActionResult: """ Transform data structure or format Parameters: documentList (str): Reference to the document list to transform transformation (Dict[str, Any]): Transformation rules outputFormat (str, optional): Desired output format """ try: documentList = parameters.get("documentList") transformation = parameters.get("transformation") outputFormat = parameters.get("outputFormat", "json") if not documentList or not transformation: return self._createResult( success=False, data={}, error="Document list reference and transformation rules are required" ) # Get documents from reference chatDocuments = self.serviceContainer.getChatDocumentsFromDocumentReference(documentList) if not chatDocuments: return self._createResult( success=False, data={}, error="No documents found for the provided reference" ) # Extract content from all documents all_document_content = [] for chatDocument in chatDocuments: fileId = chatDocument.fileId file_data = self.serviceContainer.getFileData(fileId) file_info = self.serviceContainer.getFileInfo(fileId) if not file_data: logger.warning(f"File data not found for fileId: {fileId}") continue all_document_content.append({ "fileId": fileId, "fileName": file_info.get('name', 'unknown'), "content": file_data }) if not all_document_content: return self._createResult( success=False, data={}, error="No content could be extracted from any documents" ) # Combine all document content for transformation combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join([ f"File: {doc['fileName']}\nContent: {doc['content']}" for doc in all_document_content ]) # Create transformation prompt transform_prompt = f""" Transform the following data according to the specified rules. Data to transform: {combined_content} Transformation rules: {transformation} Output format: {outputFormat} Please provide: 1. Transformed data in the specified format 2. Summary of transformation results 3. Validation of transformation rules 4. Any data quality improvements """ # Use AI to perform transformation transformed_result = await self.serviceContainer.interfaceAiCalls.callAiTextAdvanced(transform_prompt) # Create result data result_data = { "documentCount": len(chatDocuments), "transformation": transformation, "outputFormat": outputFormat, "transformedData": transformed_result, "timestamp": datetime.now(UTC).isoformat() } return self._createResult( success=True, data={ "documentName": f"transformed_data_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.{outputFormat}", "documentData": result_data } ) except Exception as e: logger.error(f"Error transforming data: {str(e)}") return self._createResult( success=False, data={}, error=str(e) )