""" AI processing method module. Handles direct AI calls for any type of task. """ import time import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelWorkflow import ExtractContentParameters from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentPart logger = logging.getLogger(__name__) class MethodAi(MethodBase): """AI processing methods.""" def __init__(self, services): super().__init__(services) self.name = "ai" self.description = "AI processing methods" def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") @action async def process(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Universal AI document processing action - accepts MULTIPLE input documents in ANY format (docx, pdf, json, txt, xlsx, html, images, etc.) and processes them together with a prompt to produce MULTIPLE output documents in ANY specified format (via resultType). Use for document generation, format conversion, content transformation, analysis, summarization, translation, extraction, comparison, and any AI-powered document manipulation. - Input requirements: aiPrompt (required); optional documentList (can contain multiple documents in any format). - Output format: Multiple documents in the same format per call (via resultType: txt, json, pdf, docx, xlsx, pptx, png, jpg, etc.). The AI can generate multiple files based on the prompt (e.g., "create separate documents for each section"). Default: txt. - Key capabilities: Can process any number of input documents together, extract data from mixed formats, combine information, generate multiple output files, transform between formats, perform analysis/comparison/summarization on document sets. Parameters: - aiPrompt (str, required): Instruction for the AI describing what processing to perform. - documentList (list, optional): Document reference(s) in any format to use as input/context. - resultType (str, optional): Output file extension (txt, json, md, csv, xml, html, pdf, docx, xlsx, png, etc.). All output documents will use this format. Default: txt. """ try: # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"ai_process_{workflowId}_{int(time.time())}" # Start progress tracking parentOperationId = parameters.get('parentOperationId') self.services.chat.progressLogStart( operationId, "Generate", "AI Processing", f"Format: {parameters.get('resultType', 'txt')}", parentOperationId=parentOperationId ) aiPrompt = parameters.get("aiPrompt") logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") # Update progress - preparing parameters self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters") from modules.datamodels.datamodelDocref import DocumentReferenceList documentListParam = parameters.get("documentList") # Convert to DocumentReferenceList if needed if documentListParam is None: documentList = DocumentReferenceList(references=[]) elif isinstance(documentListParam, DocumentReferenceList): documentList = documentListParam elif isinstance(documentListParam, str): documentList = DocumentReferenceList.from_string_list([documentListParam]) elif isinstance(documentListParam, list): documentList = DocumentReferenceList.from_string_list(documentListParam) else: logger.error(f"Invalid documentList type: {type(documentListParam)}") documentList = DocumentReferenceList(references=[]) resultType = parameters.get("resultType", "txt") if not aiPrompt: logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}") return ActionResult.isFailure( error="AI prompt is required" ) # Determine output extension and default MIME type without duplicating service logic normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt") output_extension = f".{normalized_result_type}" output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available logger.info(f"Using result type: {resultType} -> {output_extension}") # Phase 7.3: Extract content first if documents provided, then use contentParts # Check if contentParts are already provided (preferred path) contentParts: Optional[List[ContentPart]] = None if "contentParts" in parameters: contentParts = parameters.get("contentParts") if contentParts and not isinstance(contentParts, list): # Try to extract from ContentExtracted if it's an ActionDocument if hasattr(contentParts, 'parts'): contentParts = contentParts.parts else: logger.warning(f"Invalid contentParts type: {type(contentParts)}, treating as empty") contentParts = None # If contentParts not provided but documentList is, extract content first if not contentParts and documentList.references: self.services.chat.progressLogUpdate(operationId, 0.3, "Extracting content from documents") # Get ChatDocuments chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: logger.warning("No documents found in documentList") else: logger.info(f"Extracting content from {len(chatDocuments)} documents") # Prepare extraction options (use defaults if not provided) extractionOptions = parameters.get("extractionOptions") if not extractionOptions: extractionOptions = ExtractionOptions( prompt="Extract all content from the document", mergeStrategy=MergeStrategy( mergeType="concatenate", groupBy="typeGroup", orderBy="id" ), processDocumentsIndividually=True ) # Extract content using extraction service with hierarchical progress logging # Pass operationId for per-document progress tracking extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) # Combine all ContentParts from all extracted results contentParts = [] for extracted in extractedResults: if extracted.parts: contentParts.extend(extracted.parts) logger.info(f"Extracted {len(contentParts)} content parts from {len(extractedResults)} documents") # Update progress - preparing AI call self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call") # Build options with only resultFormat - let service layer handle all other parameters output_format = output_extension.replace('.', '') or 'txt' options = AiCallOptions( resultFormat=output_format # Removed all model parameters - service layer will analyze prompt and determine optimal parameters ) # Update progress - calling AI self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI") # Use unified callAiContent method with contentParts (extraction is now separate) aiResponse = await self.services.ai.callAiContent( prompt=aiPrompt, options=options, contentParts=contentParts, # Already extracted (or None if no documents) outputFormat=output_format, parentOperationId=operationId ) # Update progress - processing result self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result") from modules.datamodels.datamodelChat import ActionDocument # Extract documents from AiResponse if aiResponse.documents and len(aiResponse.documents) > 0: action_documents = [] for doc in aiResponse.documents: validationMetadata = { "actionType": "ai.process", "resultType": normalized_result_type, "outputFormat": output_format, "hasDocuments": True, "documentCount": len(aiResponse.documents) } action_documents.append(ActionDocument( documentName=doc.documentName, documentData=doc.documentData, mimeType=doc.mimeType or output_mime_type, sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation validationMetadata=validationMetadata )) final_documents = action_documents else: # Text response - create document from content extension = output_extension.lstrip('.') meaningful_name = self._generateMeaningfulFileName( base_name="ai", extension=extension, action_name="result" ) validationMetadata = { "actionType": "ai.process", "resultType": normalized_result_type, "outputFormat": output_format, "hasDocuments": False, "contentType": "text" } action_document = ActionDocument( documentName=meaningful_name, documentData=aiResponse.content, mimeType=output_mime_type, validationMetadata=validationMetadata ) final_documents = [action_document] # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=final_documents) except Exception as e: logger.error(f"Error in AI processing: {str(e)}") # Complete progress tracking with failure try: self.services.chat.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors return ActionResult.isFailure( error=str(e) ) @action async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Web research with two-step process: search for URLs, then crawl content. - Input requirements: prompt (required); optional list(url), country, language, researchDepth. - Output format: JSON with research results including URLs and content. Parameters: - prompt (str, required): Natural language research instruction. - urlList (list, optional): Specific URLs to crawl, if needed. - country (str, optional): Two-digit country code (lowercase, e.g., ch, us, de). - language (str, optional): Language code (lowercase, e.g., de, en, fr). - researchDepth (str, optional): Research depth - fast, general, or deep. Default: general. """ try: prompt = parameters.get("prompt") if not prompt: return ActionResult.isFailure(error="Research prompt is required") # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"web_research_{workflowId}_{int(time.time())}" # Start progress tracking parentOperationId = parameters.get('parentOperationId') self.services.chat.progressLogStart( operationId, "Web Research", "Searching and Crawling", "Extracting URLs and Content", parentOperationId=parentOperationId ) # Call webcrawl service - service handles all AI intention analysis and processing result = await self.services.web.performWebResearch( prompt=prompt, urls=parameters.get("urlList", []), country=parameters.get("country"), language=parameters.get("language"), researchDepth=parameters.get("researchDepth", "general"), operationId=operationId ) # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) # Get meaningful filename from research result (generated by intent analyzer) suggestedFilename = result.get("suggested_filename") if suggestedFilename: # Clean and validate filename import re cleaned = suggestedFilename.strip().strip('"\'') cleaned = cleaned.replace('\n', ' ').replace('\r', ' ').strip() # Ensure it doesn't already have extension if cleaned.lower().endswith('.json'): cleaned = cleaned[:-5] # Validate: should be reasonable length and contain only safe characters if cleaned and len(cleaned) <= 60 and re.match(r'^[a-zA-Z0-9_\-]+$', cleaned): meaningfulName = f"{cleaned}.json" else: # Fallback to generic meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_research", extension="json", action_name="research" ) else: # Fallback to generic meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_research", extension="json", action_name="research" ) from modules.datamodels.datamodelChat import ActionDocument validationMetadata = { "actionType": "ai.webResearch", "prompt": prompt, "urlList": parameters.get("urlList", []), "country": parameters.get("country"), "language": parameters.get("language"), "researchDepth": parameters.get("researchDepth", "general"), "resultFormat": "json" } actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="application/json", validationMetadata=validationMetadata ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web research: {str(e)}") try: self.services.chat.progressLogFinish(operationId, False) except: pass return ActionResult.isFailure(error=str(e)) # ============================================================================ # Document Transformation Wrappers # ============================================================================ @action async def summarizeDocument(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Summarize one or more documents, extracting key points and main ideas. - Input requirements: documentList (required); optional summaryLength, focus. - Output format: Text document with summary (default: txt, can be overridden with resultType). Parameters: - documentList (list, required): Document reference(s) to summarize. - summaryLength (str, optional): Desired summary length - brief, medium, or detailed. Default: medium. - focus (str, optional): Specific aspect to focus on in the summary (e.g., "financial data", "key decisions"). - resultType (str, optional): Output file extension (txt, md, docx, etc.). Default: txt. """ documentList = parameters.get("documentList", []) if not documentList: return ActionResult.isFailure(error="documentList is required") summaryLength = parameters.get("summaryLength", "medium") focus = parameters.get("focus") resultType = parameters.get("resultType", "txt") lengthInstructions = { "brief": "Create a brief summary (2-3 paragraphs)", "medium": "Create a medium-length summary (comprehensive but concise)", "detailed": "Create a detailed summary covering all major points" } lengthInstruction = lengthInstructions.get(summaryLength.lower(), lengthInstructions["medium"]) aiPrompt = f"Summarize the provided document(s). {lengthInstruction}." if focus: aiPrompt += f" Focus specifically on: {focus}." aiPrompt += " Extract and present the key points, main ideas, and important information in a clear, well-structured format." return await self.process({ "aiPrompt": aiPrompt, "documentList": documentList, "resultType": resultType }) @action async def translateDocument(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Translate documents to a target language while preserving formatting and structure. - Input requirements: documentList (required); targetLanguage (required). - Output format: Translated document in same format as input (default) or specified resultType. Parameters: - documentList (list, required): Document reference(s) to translate. - targetLanguage (str, required): Target language code or name (e.g., "de", "German", "French", "es"). - sourceLanguage (str, optional): Source language if known (e.g., "en", "English"). If not provided, AI will detect. - preserveFormatting (bool, optional): Whether to preserve original formatting. Default: True. - resultType (str, optional): Output file extension. If not specified, uses same format as input. """ documentList = parameters.get("documentList", []) if not documentList: return ActionResult.isFailure(error="documentList is required") targetLanguage = parameters.get("targetLanguage") if not targetLanguage: return ActionResult.isFailure(error="targetLanguage is required") sourceLanguage = parameters.get("sourceLanguage") preserveFormatting = parameters.get("preserveFormatting", True) resultType = parameters.get("resultType") aiPrompt = f"Translate the provided document(s) to {targetLanguage}." if sourceLanguage: aiPrompt += f" The source language is {sourceLanguage}." if preserveFormatting: aiPrompt += " Preserve all formatting, structure, tables, and layout exactly as they appear in the original document." else: aiPrompt += " Focus on accurate translation of content." aiPrompt += " Maintain the same document structure, headings, and organization." processParams = { "aiPrompt": aiPrompt, "documentList": documentList } if resultType: processParams["resultType"] = resultType return await self.process(processParams) @action async def convert(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Convert documents/data between different formats with specific formatting options (e.g., JSON→CSV with custom columns, delimiters). - Input requirements: documentList (required); inputFormat and outputFormat (required). - Output format: Document in target format with specified formatting options. - CRITICAL: If input is already in standardized JSON format, uses automatic rendering system (no AI call needed). Parameters: - documentList (list, required): Document reference(s) to convert. - inputFormat (str, required): Source format (json, csv, xlsx, txt, etc.). - outputFormat (str, required): Target format (csv, json, xlsx, txt, etc.). - columnsPerRow (int, optional): For CSV output, number of columns per row. Default: auto-detect. - delimiter (str, optional): For CSV output, delimiter character. Default: comma (,). - includeHeader (bool, optional): For CSV output, whether to include header row. Default: True. - language (str, optional): Language for output (e.g., 'de', 'en', 'fr'). Default: 'en'. """ documentList = parameters.get("documentList", []) if not documentList: return ActionResult.isFailure(error="documentList is required") inputFormat = parameters.get("inputFormat") outputFormat = parameters.get("outputFormat") if not inputFormat or not outputFormat: return ActionResult.isFailure(error="inputFormat and outputFormat are required") # Normalize formats (remove leading dot if present) normalizedInputFormat = inputFormat.strip().lstrip('.').lower() normalizedOutputFormat = outputFormat.strip().lstrip('.').lower() # Get documents from modules.datamodels.datamodelDocref import DocumentReferenceList if isinstance(documentList, DocumentReferenceList): docRefList = documentList elif isinstance(documentList, list): docRefList = DocumentReferenceList.from_string_list(documentList) else: docRefList = DocumentReferenceList.from_string_list([documentList]) chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docRefList) if not chatDocuments: return ActionResult.isFailure(error="No documents found in documentList") # Check if input is standardized JSON format - if so, use direct rendering if normalizedInputFormat == "json" and len(chatDocuments) == 1: try: import json doc = chatDocuments[0] # ChatDocument doesn't have documentData - need to load file content using fileId docBytes = self.services.chat.getFileData(doc.fileId) if not docBytes: raise ValueError(f"No file data found for fileId={doc.fileId}") # Decode bytes to string docData = docBytes.decode('utf-8') # Try to parse as JSON if isinstance(docData, str): jsonData = json.loads(docData) elif isinstance(docData, dict): jsonData = docData else: jsonData = None # Check if it's standardized JSON format (has "documents" or "sections") if jsonData and (isinstance(jsonData, dict) and ("documents" in jsonData or "sections" in jsonData)): # Use direct rendering - no AI call needed! from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generationService = GenerationService(self.services) # Ensure format is "documents" array if "documents" not in jsonData: jsonData = {"documents": [{"sections": jsonData.get("sections", []), "metadata": jsonData.get("metadata", {})}]} # Get title title = jsonData.get("metadata", {}).get("title", doc.documentName or "Converted Document") # Render with options renderOptions = {} if normalizedOutputFormat == "csv": renderOptions["delimiter"] = parameters.get("delimiter", ",") renderOptions["columnsPerRow"] = parameters.get("columnsPerRow") renderOptions["includeHeader"] = parameters.get("includeHeader", True) rendered_content, mime_type = await generationService.renderReport( jsonData, normalizedOutputFormat, title, None, None ) # Apply CSV options if needed (renderer will handle them) if normalizedOutputFormat == "csv" and renderOptions: rendered_content = self._applyCsvOptions(rendered_content, renderOptions) from modules.datamodels.datamodelChat import ActionDocument validationMetadata = { "actionType": "ai.convert", "inputFormat": normalizedInputFormat, "outputFormat": normalizedOutputFormat, "hasSourceJson": True, "conversionType": "direct_rendering" } actionDoc = ActionDocument( documentName=f"{doc.documentName.rsplit('.', 1)[0] if '.' in doc.documentName else doc.documentName}.{normalizedOutputFormat}", documentData=rendered_content, mimeType=mime_type, sourceJson=jsonData, # Preserve source JSON for structure validation validationMetadata=validationMetadata ) return ActionResult.isSuccess(documents=[actionDoc]) except Exception as e: logger.warning(f"Direct rendering failed, falling back to AI conversion: {str(e)}") # Fall through to AI-based conversion # Fallback: Use AI for conversion (for non-JSON inputs or complex conversions) columnsPerRow = parameters.get("columnsPerRow") delimiter = parameters.get("delimiter", ",") includeHeader = parameters.get("includeHeader", True) language = parameters.get("language", "en") aiPrompt = f"Convert the provided document(s) from {normalizedInputFormat.upper()} format to {normalizedOutputFormat.upper()} format." if normalizedOutputFormat == "csv": aiPrompt += f" Use '{delimiter}' as the delimiter character." if columnsPerRow: aiPrompt += f" Format the output with {columnsPerRow} columns per row." if not includeHeader: aiPrompt += " Do not include a header row." else: aiPrompt += " Include a header row with column names." if language and language != "en": aiPrompt += f" Use language: {language}." aiPrompt += " Preserve all data and ensure accurate conversion. Maintain data integrity and structure." return await self.process({ "aiPrompt": aiPrompt, "documentList": documentList, "resultType": normalizedOutputFormat }) def _applyCsvOptions(self, csvContent: str, options: Dict[str, Any]) -> str: """Apply CSV formatting options to rendered CSV content.""" delimiter = options.get("delimiter", ",") columnsPerRow = options.get("columnsPerRow") includeHeader = options.get("includeHeader", True) # Check if any options need to be applied needsProcessing = (delimiter != ",") or (columnsPerRow is not None) or (not includeHeader) if not needsProcessing: return csvContent import csv import io # Re-read CSV with comma, write with new delimiter reader = csv.reader(io.StringIO(csvContent)) output = io.StringIO() writer = csv.writer(output, delimiter=delimiter) rows = list(reader) # Handle header if not includeHeader and rows: rows = rows[1:] # Skip header # Handle columnsPerRow if columnsPerRow: newRows = [] for row in rows: # Split row into chunks of columnsPerRow for i in range(0, len(row), columnsPerRow): chunk = row[i:i+columnsPerRow] # Pad to columnsPerRow if needed while len(chunk) < columnsPerRow: chunk.append("") newRows.append(chunk) rows = newRows for row in rows: writer.writerow(row) return output.getvalue() @action async def convertDocument(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Convert documents between different formats (PDF→Word, Excel→CSV, etc.). - Input requirements: documentList (required); targetFormat (required). - Output format: Document in target format. Parameters: - documentList (list, required): Document reference(s) to convert. - targetFormat (str, required): Target format extension (docx, pdf, xlsx, csv, txt, html, json, md, etc.). - preserveStructure (bool, optional): Whether to preserve document structure (headings, tables, etc.). Default: True. """ documentList = parameters.get("documentList", []) if not documentList: return ActionResult.isFailure(error="documentList is required") targetFormat = parameters.get("targetFormat") if not targetFormat: return ActionResult.isFailure(error="targetFormat is required") preserveStructure = parameters.get("preserveStructure", True) # Normalize format (remove leading dot if present) normalizedFormat = targetFormat.strip().lstrip('.').lower() aiPrompt = f"Convert the provided document(s) to {normalizedFormat.upper()} format." if preserveStructure: aiPrompt += " Preserve all document structure including headings, tables, formatting, lists, and layout." aiPrompt += " Ensure the converted document maintains the same content and information as the original." return await self.process({ "aiPrompt": aiPrompt, "documentList": documentList, "resultType": normalizedFormat }) @action async def extractData(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Extract structured data from documents (key-value pairs, entities, facts, etc.). - Input requirements: documentList (required); optional dataStructure, fields. - Output format: JSON by default, or specified resultType. Parameters: - documentList (list, required): Document reference(s) to extract data from. - dataStructure (str, optional): Desired data structure - flat, nested, or list. Default: nested. - fields (list, optional): Specific fields/properties to extract (e.g., ["name", "date", "amount"]). - resultType (str, optional): Output format (json, csv, xlsx, etc.). Default: json. """ documentList = parameters.get("documentList", []) if not documentList: return ActionResult.isFailure(error="documentList is required") dataStructure = parameters.get("dataStructure", "nested") fields = parameters.get("fields", []) resultType = parameters.get("resultType", "json") aiPrompt = "Extract structured data from the provided document(s)." if fields: fieldsStr = ", ".join(fields) aiPrompt += f" Extract the following specific fields: {fieldsStr}." else: aiPrompt += " Extract all relevant data including names, dates, amounts, entities, and key information." structureInstructions = { "flat": "Use a flat key-value structure with simple properties.", "nested": "Use a nested JSON structure with logical grouping of related data.", "list": "Structure the data as a list/array of objects, one per entity or record." } aiPrompt += f" {structureInstructions.get(dataStructure.lower(), structureInstructions['nested'])}" aiPrompt += " Ensure all extracted data is accurate and complete." return await self.process({ "aiPrompt": aiPrompt, "documentList": documentList, "resultType": resultType }) # ============================================================================ # Content Generation Wrapper # ============================================================================ @action async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Generate documents from scratch or based on templates/inputs. - Input requirements: prompt or description (required); optional documentList (for templates/references). - Output format: Document in specified format (default: docx). Parameters: - prompt (str, required): Description of the document to generate. - documentList (list, optional): Template documents or reference documents to use as a guide. - documentType (str, optional): Type of document - letter, memo, proposal, contract, etc. - resultType (str, optional): Output format (docx, pdf, txt, md, etc.). Default: docx. """ prompt = parameters.get("prompt") if not prompt: return ActionResult.isFailure(error="prompt is required") documentList = parameters.get("documentList", []) documentType = parameters.get("documentType") resultType = parameters.get("resultType", "docx") aiPrompt = f"Generate a document based on the following requirements: {prompt}" if documentType: aiPrompt += f" Document type: {documentType}." if documentList: aiPrompt += " Use the provided template/reference documents as a guide for structure, format, and style." aiPrompt += " Create a professional, well-structured document with appropriate formatting and organization." processParams = { "aiPrompt": aiPrompt, "resultType": resultType } if documentList: processParams["documentList"] = documentList return await self.process(processParams)