From 1bb02880df4d2fcd08af2d267d8d47a00ba3f95a Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Thu, 16 Oct 2025 22:52:58 +0200 Subject: [PATCH] Refactored standardized full stats and cost logging system over all components --- modules/datamodels/datamodelAi.py | 8 +- modules/datamodels/datamodelChat.py | 22 +- modules/interfaces/interfaceAiObjects.py | 312 +++++++++++++++--- modules/interfaces/interfaceDbChatObjects.py | 121 ++----- modules/routes/routeDataPrompts.py | 4 +- modules/routes/routeDataUsers.py | 4 +- modules/services/serviceAi/subCoreAi.py | 42 ++- .../mainServiceExtraction.py | 36 ++ .../mainServiceGeneration.py | 85 ++++- .../serviceWorkflow/mainServiceWorkflow.py | 65 ++-- .../methods/_EXCLUDED_methodDocument.py | 252 -------------- modules/workflows/workflowManager.py | 9 - 12 files changed, 491 insertions(+), 469 deletions(-) delete mode 100644 modules/workflows/methods/_EXCLUDED_methodDocument.py diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 41c434da..dba52e0c 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -131,8 +131,11 @@ class AiCallResponse(BaseModel): content: str = Field(description="AI response content") modelName: str = Field(description="Selected model name") - usedTokens: Optional[int] = Field(default=None, description="Estimated used tokens") - costEstimate: Optional[float] = Field(default=None, description="Estimated cost of the call") + priceUsd: float = Field(default=0.0, description="Calculated price in USD") + processingTime: float = Field(default=0.0, description="Duration in seconds") + bytesSent: int = Field(default=0, description="Input data size in bytes") + bytesReceived: int = Field(default=0, description="Output data size in bytes") + errorCount: int = Field(default=0, description="0 for success, 1+ for errors") class EnhancedAiCallOptions(AiCallOptions): @@ -160,4 +163,3 @@ class EnhancedAiCallOptions(AiCallOptions): description="Separator between chunks in merged output" ) - diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py index 198360bf..c75ea70b 100644 --- a/modules/datamodels/datamodelChat.py +++ b/modules/datamodels/datamodelChat.py @@ -15,17 +15,15 @@ class ChatStat(BaseModel, ModelMixin): workflowId: Optional[str] = Field( None, description="Foreign key to workflow (for workflow stats)" ) - messageId: Optional[str] = Field( - None, description="Foreign key to message (for message stats)" - ) processingTime: Optional[float] = Field( None, description="Processing time in seconds" ) - tokenCount: Optional[int] = Field(None, description="Number of tokens processed") bytesSent: Optional[int] = Field(None, description="Number of bytes sent") bytesReceived: Optional[int] = Field(None, description="Number of bytes received") - successRate: Optional[float] = Field(None, description="Success rate of operations") errorCount: Optional[int] = Field(None, description="Number of errors encountered") + process: Optional[str] = Field(None, description="The process that delivers the stats data (e.g. 'action.outlook.readMails', 'ai.process.document.name')") + engine: Optional[str] = Field(None, description="The engine used (e.g. 'ai.anthropic.35', 'ai.tavily.basic', 'renderer.docx')") + priceUsd: Optional[float] = Field(None, description="Calculated price in USD for the operation") register_model_labels( @@ -34,13 +32,13 @@ register_model_labels( { "id": {"en": "ID", "fr": "ID"}, "workflowId": {"en": "Workflow ID", "fr": "ID du workflow"}, - "messageId": {"en": "Message ID", "fr": "ID du message"}, "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"}, - "tokenCount": {"en": "Token Count", "fr": "Nombre de tokens"}, "bytesSent": {"en": "Bytes Sent", "fr": "Octets envoyés"}, "bytesReceived": {"en": "Bytes Received", "fr": "Octets reçus"}, - "successRate": {"en": "Success Rate", "fr": "Taux de succès"}, "errorCount": {"en": "Error Count", "fr": "Nombre d'erreurs"}, + "process": {"en": "Process", "fr": "Processus"}, + "engine": {"en": "Engine", "fr": "Moteur"}, + "priceUsd": {"en": "Price USD", "fr": "Prix USD"}, }, ) @@ -214,7 +212,6 @@ class ChatMessage(BaseModel, ModelMixin): default_factory=get_utc_timestamp, description="When the message was published (UTC timestamp in seconds)", ) - stats: Optional[ChatStat] = Field(None, description="Statistics for this message") success: Optional[bool] = Field( None, description="Whether the message processing was successful" ) @@ -253,7 +250,6 @@ register_model_labels( "status": {"en": "Status", "fr": "Statut"}, "sequenceNr": {"en": "Sequence Number", "fr": "Numéro de séquence"}, "publishedAt": {"en": "Published At", "fr": "Publié le"}, - "stats": {"en": "Statistics", "fr": "Statistiques"}, "success": {"en": "Success", "fr": "Succès"}, "actionId": {"en": "Action ID", "fr": "ID de l'action"}, "actionMethod": {"en": "Action Method", "fr": "Méthode de l'action"}, @@ -362,9 +358,9 @@ class ChatWorkflow(BaseModel, ModelMixin): frontend_readonly=True, frontend_required=False, ) - stats: Optional[ChatStat] = Field( - None, - description="Workflow statistics", + stats: List[ChatStat] = Field( + default_factory=list, + description="Workflow statistics list", frontend_type="text", frontend_readonly=True, frontend_required=False, diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py index 7c3c5ce2..d272fc41 100644 --- a/modules/interfaces/interfaceAiObjects.py +++ b/modules/interfaces/interfaceAiObjects.py @@ -2,6 +2,7 @@ import logging import asyncio from typing import Dict, Any, List, Union, Tuple, Optional from dataclasses import dataclass +import time logger = logging.getLogger(__name__) @@ -44,7 +45,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 8, "qualityRating": 9, "capabilities": ["text_generation", "chat", "reasoning", "analysis"], - "tags": ["text", "chat", "reasoning", "analysis", "general"] + "tags": ["text", "chat", "reasoning", "analysis", "general"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.03 + (bytesReceived / 4 / 1000) * 0.06 }, "openai_callAiBasic_gpt35": { "connector": "openai", @@ -56,7 +58,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 9, "qualityRating": 7, "capabilities": ["text_generation", "chat", "reasoning"], - "tags": ["text", "chat", "reasoning", "general", "fast"] + "tags": ["text", "chat", "reasoning", "general", "fast"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0015 + (bytesReceived / 4 / 1000) * 0.002 }, "openai_callAiImage": { "connector": "openai", @@ -68,7 +71,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 7, "qualityRating": 9, "capabilities": ["image_analysis", "vision", "multimodal"], - "tags": ["image", "vision", "multimodal"] + "tags": ["image", "vision", "multimodal"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.03 + (bytesReceived / 4 / 1000) * 0.06 }, "openai_generateImage": { "connector": "openai", @@ -80,7 +84,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 6, "qualityRating": 9, "capabilities": ["image_generation", "art", "visual_creation"], - "tags": ["image_generation", "art", "visual"] + "tags": ["image_generation", "art", "visual"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.04 }, # Anthropic Models @@ -94,7 +99,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 7, "qualityRating": 10, "capabilities": ["text_generation", "chat", "reasoning", "analysis"], - "tags": ["text", "chat", "reasoning", "analysis", "high_quality"] + "tags": ["text", "chat", "reasoning", "analysis", "high_quality"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.015 + (bytesReceived / 4 / 1000) * 0.075 }, "anthropic_callAiImage": { "connector": "anthropic", @@ -106,7 +112,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 7, "qualityRating": 10, "capabilities": ["image_analysis", "vision", "multimodal"], - "tags": ["image", "vision", "multimodal", "high_quality"] + "tags": ["image", "vision", "multimodal", "high_quality"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.015 + (bytesReceived / 4 / 1000) * 0.075 }, # Perplexity Models @@ -120,7 +127,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 8, "qualityRating": 8, "capabilities": ["text_generation", "chat", "reasoning", "web_search"], - "tags": ["text", "chat", "reasoning", "web_search", "cost_effective"] + "tags": ["text", "chat", "reasoning", "web_search", "cost_effective"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.005 }, "perplexity_callAiWithWebSearch": { "connector": "perplexity", @@ -132,7 +140,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 7, "qualityRating": 9, "capabilities": ["text_generation", "web_search", "research"], - "tags": ["text", "web_search", "research", "high_quality"] + "tags": ["text", "web_search", "research", "high_quality"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.01 + (bytesReceived / 4 / 1000) * 0.01 }, "perplexity_researchTopic": { "connector": "perplexity", @@ -144,7 +153,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 8, "qualityRating": 8, "capabilities": ["web_search", "research", "information_gathering"], - "tags": ["web_search", "research", "information", "cost_effective"] + "tags": ["web_search", "research", "information", "cost_effective"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.002 + (bytesReceived / 4 / 1000) * 0.002 }, "perplexity_answerQuestion": { "connector": "perplexity", @@ -156,7 +166,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 8, "qualityRating": 8, "capabilities": ["web_search", "question_answering", "research"], - "tags": ["web_search", "qa", "research", "cost_effective"] + "tags": ["web_search", "qa", "research", "cost_effective"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.002 + (bytesReceived / 4 / 1000) * 0.002 }, "perplexity_getCurrentNews": { "connector": "perplexity", @@ -168,7 +179,8 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 8, "qualityRating": 8, "capabilities": ["web_search", "news", "current_events"], - "tags": ["web_search", "news", "current_events", "cost_effective"] + "tags": ["web_search", "news", "current_events", "cost_effective"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.002 + (bytesReceived / 4 / 1000) * 0.002 }, # Tavily Web Models @@ -177,16 +189,21 @@ aiModels: Dict[str, Dict[str, Any]] = { "function": "search", "llmName": "tavily-search", "contextLength": 0, - "costPer1kTokens": 0.0, - "costPer1kTokensOutput": 0.0, + "costPer1kTokens": 0.0, # Not token-based + "costPer1kTokensOutput": 0.0, # Not token-based "speedRating": 8, "qualityRating": 8, "capabilities": ["web_search", "information_retrieval", "url_discovery"], - "tags": ["web", "search", "urls", "information"] + "tags": ["web", "search", "urls", "information"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numRequests=1: ( + # Basic search: 1 credit, Advanced: 2 credits + # Cost per credit: $0.008 + numRequests * (1 if searchDepth == "basic" else 2) * 0.008 + ) }, - "tavily_crawl": { + "tavily_extract": { "connector": "tavily", - "function": "crawl", + "function": "extract", "llmName": "tavily-extract", "contextLength": 0, "costPer1kTokens": 0.0, @@ -194,7 +211,31 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 6, "qualityRating": 8, "capabilities": ["web_crawling", "content_extraction", "text_extraction"], - "tags": ["web", "crawl", "extract", "content"] + "tags": ["web", "extract", "content"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived, extractionDepth="basic", numSuccessfulUrls=1: ( + # Basic: 1 credit per 5 URLs, Advanced: 2 credits per 5 URLs + # Only charged for successful extractions + (numSuccessfulUrls / 5) * (1 if extractionDepth == "basic" else 2) * 0.008 + ) + }, + "tavily_crawl": { + "connector": "tavily", + "function": "crawl", + "llmName": "tavily-crawl", + "contextLength": 0, + "costPer1kTokens": 0.0, + "costPer1kTokensOutput": 0.0, + "speedRating": 6, + "qualityRating": 8, + "capabilities": ["web_crawling", "content_extraction", "mapping"], + "tags": ["web", "crawl", "map", "extract"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived, numPages=10, extractionDepth="basic", withInstructions=False, numSuccessfulExtractions=10: ( + # Crawl = Mapping + Extraction + # Mapping: 1 credit per 10 pages (2 if with instructions) + # Extraction: 1 credit per 5 successful extractions (2 if advanced) + ((numPages / 10) * (2 if withInstructions else 1) + + (numSuccessfulExtractions / 5) * (1 if extractionDepth == "basic" else 2)) * 0.008 + ) }, "tavily_scrape": { "connector": "tavily", @@ -206,7 +247,54 @@ aiModels: Dict[str, Dict[str, Any]] = { "speedRating": 6, "qualityRating": 8, "capabilities": ["web_search", "web_crawling", "content_extraction", "information_retrieval"], - "tags": ["web", "search", "crawl", "extract", "content", "information"] + "tags": ["web", "search", "crawl", "extract", "content", "information"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numSuccessfulUrls=1, extractionDepth="basic": ( + # Combines search + extraction + # Search cost + extraction cost + (1 if searchDepth == "basic" else 2) + + (numSuccessfulUrls / 5) * (1 if extractionDepth == "basic" else 2) + ) * 0.008 + }, + + # Internal Models + "internal_extraction": { + "connector": "internal", + "function": "extract", + "llmName": "internal-extractor", + "contextLength": 0, + "costPer1kTokens": 0.0, + "costPer1kTokensOutput": 0.0, + "speedRating": 8, + "qualityRating": 8, + "capabilities": ["document_extraction", "content_processing"], + "tags": ["internal", "extraction", "document_processing"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: 0.001 + (bytesSent + bytesReceived) / (1024 * 1024) * 0.01 # $0.001 base + $0.01/MB + }, + "internal_generation": { + "connector": "internal", + "function": "generate", + "llmName": "internal-generator", + "contextLength": 0, + "costPer1kTokens": 0.0, + "costPer1kTokensOutput": 0.0, + "speedRating": 7, + "qualityRating": 8, + "capabilities": ["document_generation", "content_creation"], + "tags": ["internal", "generation", "document_creation"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: 0.002 + (bytesReceived / (1024 * 1024)) * 0.005 # $0.002 base + $0.005/MB output + }, + "internal_rendering": { + "connector": "internal", + "function": "render", + "llmName": "internal-renderer", + "contextLength": 0, + "costPer1kTokens": 0.0, + "costPer1kTokensOutput": 0.0, + "speedRating": 6, + "qualityRating": 9, + "capabilities": ["document_rendering", "format_conversion"], + "tags": ["internal", "rendering", "format_conversion"], + "calculatePriceUsd": lambda processingTime, bytesSent, bytesReceived: 0.003 + (bytesReceived / (1024 * 1024)) * 0.008 # $0.003 base + $0.008/MB output } } @@ -251,6 +339,7 @@ class AiObjects: outputCost = (estimatedTokens / 1000) * modelInfo["costPer1kTokensOutput"] * 0.1 return inputCost + outputCost + def _selectModel(self, prompt: str, context: str, options: AiCallOptions) -> str: """Select the best model based on operation type, tags, and requirements.""" totalSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8")) @@ -398,10 +487,14 @@ class AiObjects: async def call(self, request: AiCallRequest) -> AiCallResponse: """Call AI model for text generation with fallback mechanism.""" + prompt = request.prompt context = request.context or "" options = request.options + # Calculate input bytes + inputBytes = len((prompt + context).encode("utf-8")) + # Compress optionally (prompt/context) - simple truncation fallback kept here def maybeTruncate(text: str, limit: int) -> str: data = text.encode("utf-8") @@ -439,6 +532,9 @@ class AiObjects: try: logger.info(f"Attempting AI call with model: {modelName} (attempt {attempt + 1}/{len(fallbackModels)})") + # Start timing + startTime = time.time() + connector = self._connectorFor(modelName) functionName = aiModels[modelName]["function"] @@ -469,13 +565,24 @@ class AiObjects: else: raise ValueError(f"Function {functionName} not supported for text generation") - # Success! Estimate cost/tokens and return - totalSize = len((prompt + context).encode("utf-8")) - cost = self._estimateCost(aiModels[modelName], totalSize) - usedTokens = int(totalSize / 4) + # Calculate timing and output bytes + endTime = time.time() + processingTime = endTime - startTime + outputBytes = len(content.encode("utf-8")) + + # Calculate price + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, inputBytes, outputBytes) logger.info(f"✅ AI call successful with model: {modelName}") - return AiCallResponse(content=content, modelName=modelName, usedTokens=usedTokens, costEstimate=cost) + return AiCallResponse( + content=content, + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=0 + ) except Exception as e: lastError = e @@ -490,16 +597,28 @@ class AiObjects: logger.error(f"💥 All {len(fallbackModels)} models failed for operation {options.operationType}") break - # All fallback attempts failed + # All fallback attempts failed - return error response errorMsg = f"All AI models failed for operation {options.operationType}. Last error: {str(lastError)}" logger.error(errorMsg) - raise Exception(errorMsg) + return AiCallResponse( + content=errorMsg, + modelName="error", + priceUsd=0.0, + processingTime=0.0, + bytesSent=inputBytes, + bytesReceived=0, + errorCount=1 + ) - async def callImage(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: AiCallOptions = None) -> str: + async def callImage(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: AiCallOptions = None) -> AiCallResponse: """Call AI model for image analysis with fallback mechanism.""" + if options is None: options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS) + # Calculate input bytes (prompt + image data) + inputBytes = len(prompt.encode("utf-8")) + len(imageData) if isinstance(imageData, bytes) else len(prompt.encode("utf-8")) + len(str(imageData).encode("utf-8")) + # Get fallback models for image analysis fallbackModels = self._getFallbackModels(OperationType.IMAGE_ANALYSIS) @@ -509,13 +628,33 @@ class AiObjects: try: logger.info(f"Attempting image analysis with model: {modelName} (attempt {attempt + 1}/{len(fallbackModels)})") + # Start timing + startTime = time.time() + connector = self._connectorFor(modelName) functionName = aiModels[modelName]["function"] if functionName == "callAiImage": content = await connector.callAiImage(prompt, imageData, mimeType) + + # Calculate timing and output bytes + endTime = time.time() + processingTime = endTime - startTime + outputBytes = len(content.encode("utf-8")) + + # Calculate price + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, inputBytes, outputBytes) + logger.info(f"✅ Image analysis successful with model: {modelName}") - return content + return AiCallResponse( + content=content, + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=0 + ) else: raise ValueError(f"Function {functionName} not supported for image analysis") @@ -532,32 +671,80 @@ class AiObjects: logger.error(f"💥 All {len(fallbackModels)} models failed for image analysis") break - # All fallback attempts failed + # All fallback attempts failed - return error response errorMsg = f"All AI models failed for image analysis. Last error: {str(lastError)}" logger.error(errorMsg) - raise Exception(errorMsg) + return AiCallResponse( + content=errorMsg, + modelName="error", + priceUsd=0.0, + processingTime=0.0, + bytesSent=inputBytes, + bytesReceived=0, + errorCount=1 + ) - async def generateImage(self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: AiCallOptions = None) -> Dict[str, Any]: + async def generateImage(self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: AiCallOptions = None) -> AiCallResponse: """Generate an image using AI.""" + if options is None: options = AiCallOptions(operationType=OperationType.IMAGE_GENERATION) + # Calculate input bytes + inputBytes = len(prompt.encode("utf-8")) + # Select model for image generation modelName = self._selectModel(prompt, "", options) - connector = self._connectorFor(modelName) - functionName = aiModels[modelName]["function"] - - if functionName == "generateImage": - return await connector.generateImage(prompt, size, quality, style) - elif functionName == "generateImageWithVariations": - results = await connector.generateImageWithVariations(prompt, 1, size, quality, style) - return results[0] if results else {} - elif functionName == "generateImageWithChat": - content = await connector.generateImageWithChat(prompt, size, quality, style) - return {"content": content, "success": True} - else: - raise ValueError(f"Function {functionName} not supported for image generation") + try: + # Start timing + startTime = time.time() + + connector = self._connectorFor(modelName) + functionName = aiModels[modelName]["function"] + + if functionName == "generateImage": + result = await connector.generateImage(prompt, size, quality, style) + content = str(result) + elif functionName == "generateImageWithVariations": + results = await connector.generateImageWithVariations(prompt, 1, size, quality, style) + result = results[0] if results else {} + content = str(result) + elif functionName == "generateImageWithChat": + content = await connector.generateImageWithChat(prompt, size, quality, style) + else: + raise ValueError(f"Function {functionName} not supported for image generation") + + # Calculate timing and output bytes + endTime = time.time() + processingTime = endTime - startTime + outputBytes = len(content.encode("utf-8")) + + # Calculate price + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, inputBytes, outputBytes) + + logger.info(f"✅ Image generation successful with model: {modelName}") + return AiCallResponse( + content=content, + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=0 + ) + + except Exception as e: + logger.error(f"❌ Image generation failed with model {modelName}: {str(e)}") + return AiCallResponse( + content=f"Image generation failed: {str(e)}", + modelName=modelName, + priceUsd=0.0, + processingTime=0.0, + bytesSent=inputBytes, + bytesReceived=0, + errorCount=1 + ) # Web functionality methods - Simple interface to Tavily connector async def search_websites(self, query: str, max_results: int = 5, **kwargs) -> List[WebSearchResultItem]: @@ -921,11 +1108,15 @@ class AiObjects: logger.error(f"Crawling failed with error: {e}, returning partial results: {len(all_content)} pages crawled so far") return all_content - async def webQuery(self, query: str, context: str = "", options: AiCallOptions = None) -> str: + async def webQuery(self, query: str, context: str = "", options: AiCallOptions = None) -> AiCallResponse: """Use Perplexity AI to provide the best answers for web-related queries.""" + if options is None: options = AiCallOptions(operationType=OperationType.WEB_RESEARCH) + # Calculate input bytes + inputBytes = len((query + context).encode("utf-8")) + # Create a comprehensive prompt for web queries webPrompt = f"""You are an expert web researcher and information analyst. Please provide a comprehensive and accurate answer to the following web-related query. @@ -943,12 +1134,41 @@ Please provide: Format your response in a clear, professional manner that would be helpful for someone researching this topic.""" try: + # Start timing + startTime = time.time() + # Use Perplexity for web research with search capabilities response = await self.perplexityService.callAiWithWebSearch(webPrompt) - return response + + # Calculate timing and output bytes + endTime = time.time() + processingTime = endTime - startTime + outputBytes = len(response.encode("utf-8")) + + # Calculate price (use perplexity model pricing) + priceUsd = aiModels["perplexity_callAiWithWebSearch"]["calculatePriceUsd"](processingTime, inputBytes, outputBytes) + + logger.info(f"✅ Web query successful with Perplexity") + return AiCallResponse( + content=response, + modelName="perplexity_callAiWithWebSearch", + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=0 + ) except Exception as e: logger.error(f"Perplexity web query failed: {str(e)}") - raise Exception(f"Failed to process web query: {str(e)}") + return AiCallResponse( + content=f"Web query failed: {str(e)}", + modelName="perplexity_callAiWithWebSearch", + priceUsd=0.0, + processingTime=0.0, + bytesSent=inputBytes, + bytesReceived=0, + errorCount=1 + ) # Utility methods async def listAvailableModels(self, connectorType: str = None) -> List[Dict[str, Any]]: diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 91d9b4a7..f2900bbf 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -237,7 +237,7 @@ class ChatObjects: # Load related data from normalized tables logs = self.getLogs(workflowId) messages = self.getMessages(workflowId) - stats = self.getWorkflowStats(workflowId) + stats = self.getStats(workflowId) # Validate workflow data against ChatWorkflow model return ChatWorkflow( @@ -294,7 +294,7 @@ class ChatObjects: startedAt=created.get("startedAt", currentTime), logs=[], messages=[], - stats=None, + stats=[], mandateId=created.get("mandateId", self.currentUser.mandateId), workflowMode=created.get("workflowMode", "Actionplan"), maxSteps=created.get("maxSteps", 1) @@ -325,7 +325,7 @@ class ChatObjects: # Load fresh data from normalized tables logs = self.getLogs(workflowId) messages = self.getMessages(workflowId) - stats = self.getWorkflowStats(workflowId) + stats = self.getStats(workflowId) # Convert to ChatWorkflow model return ChatWorkflow( @@ -433,7 +433,6 @@ class ChatObjects: status=msg.get("status", "step"), sequenceNr=msg.get("sequenceNr", 0), publishedAt=msg.get("publishedAt", get_utc_timestamp()), - stats=self.getMessageStats(msg["id"]), success=msg.get("success"), actionId=msg.get("actionId"), actionMethod=msg.get("actionMethod"), @@ -515,8 +514,6 @@ class ChatObjects: # Convert to dict if it's a Pydantic object if hasattr(doc_data, 'model_dump'): doc_dict = doc_data.model_dump() # Pydantic v2 - elif hasattr(doc_data, 'dict'): - doc_dict = doc_data.dict() # Pydantic v1 elif hasattr(doc_data, 'to_dict'): doc_dict = doc_data.to_dict() else: @@ -642,14 +639,6 @@ class ChatObjects: self.createDocument(doc_dict) except Exception as e: logger.error(f"Error updating message documents: {str(e)}") - if 'stats' in object_fields: - stats_data = object_fields['stats'] - try: - if stats_data: - stats_data["messageId"] = messageId - self.db.recordCreate(ChatStat, stats_data) - except Exception as e: - logger.error(f"Error updating message stats: {str(e)}") if not updatedMessage: logger.warning(f"Failed to update message {messageId}") @@ -853,91 +842,47 @@ class ChatObjects: # Stats methods - def getMessageStats(self, messageId: str) -> Optional[ChatStat]: - """Returns statistics for a message from normalized table.""" - try: - stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId}) - if not stats: - return None - # Return the most recent stats record - stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) - return ChatStat(**stats[0]) - except Exception as e: - logger.error(f"Error getting message stats: {str(e)}") - return None - - def getWorkflowStats(self, workflowId: str) -> Optional[ChatStat]: - """Returns statistics for a workflow if user has access.""" + def getStats(self, workflowId: str) -> List[ChatStat]: + """Returns list of statistics for a workflow if user has access.""" # Check workflow access first (without calling getWorkflow to avoid circular reference) workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: - return None + return [] filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: - return None + return [] # Get stats for this workflow from normalized table stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) if not stats: - return None + return [] - # Return the most recent stats record - stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) - return ChatStat(**stats[0]) + # Return all stats records sorted by creation time + stats.sort(key=lambda x: x.get("created_at", "")) + return [ChatStat(**stat) for stat in stats] - def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None: - """ - Updates workflow statistics in the database. - - Args: - workflowId: ID of the workflow to update - bytesSent: Bytes sent (incremental) - bytesReceived: Bytes received (incremental) - tokenCount: Token count (incremental, default 0) - """ + + def createStat(self, statData: Dict[str, Any]) -> ChatStat: + """Creates a new stats record and returns it.""" try: - # Check workflow access first - workflow = self.getWorkflow(workflowId) - if not workflow: - logger.warning(f"No access to workflow {workflowId} for stats update") - return - - if not self._canModify(ChatWorkflow, workflowId): - logger.warning(f"No permission to modify workflow {workflowId} for stats update") - return + # Ensure workflowId is present in statData + if "workflowId" not in statData: + raise ValueError("workflowId is required in statData") - # Get existing stats or create new ones - existing_stats = self.getWorkflowStats(workflowId) + # Validate the stat data against ChatStat model + stat = ChatStat(**statData) - if existing_stats: - # Update existing stats - updated_stats = { - "bytesSent": (existing_stats.bytesSent or 0) + bytesSent, - "bytesReceived": (existing_stats.bytesReceived or 0) + bytesReceived, - "tokenCount": (existing_stats.tokenCount or 0) + tokenCount, - "lastUpdated": get_utc_timestamp() - } - - # Update the stats record - self.db.recordModify(ChatStat, existing_stats.id, updated_stats) - else: - # Create new stats record - new_stats = { - "workflowId": workflowId, - "bytesSent": bytesSent, - "bytesReceived": bytesReceived, - "tokenCount": tokenCount, - "lastUpdated": get_utc_timestamp() - } - - self.db.recordCreate(ChatStat, new_stats) - - logger.debug(f"Updated workflow stats for {workflowId}: +{bytesSent} sent, +{bytesReceived} received, +{tokenCount} tokens") + # Create the stat record in the database + created = self.db.recordCreate(ChatStat, stat.model_dump()) + # Return the created ChatStat + return ChatStat(**created) except Exception as e: - logger.error(f"Error updating workflow stats for {workflowId}: {str(e)}") + logger.error(f"Error creating workflow stat: {str(e)}") + raise + def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]: """ @@ -979,7 +924,6 @@ class ChatObjects: status=msg.get("status", "step"), sequenceNr=msg.get("sequenceNr", 0), publishedAt=msg.get("publishedAt", get_utc_timestamp()), - stats=self.getMessageStats(msg["id"]), success=msg.get("success"), actionId=msg.get("actionId"), actionMethod=msg.get("actionMethod"), @@ -995,7 +939,7 @@ class ChatObjects: items.append({ "type": "message", "createdAt": msg_timestamp, - "item": chat_message.model_dump() if hasattr(chat_message, 'model_dump') else chat_message.dict() + "item": chat_message.model_dump() }) # Get logs @@ -1010,22 +954,21 @@ class ChatObjects: items.append({ "type": "log", "createdAt": log_timestamp, - "item": chat_log.model_dump() if hasattr(chat_log, 'model_dump') else chat_log.dict() + "item": chat_log.model_dump() }) - # Get stats - stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) + # Get stats list + stats = self.getStats(workflowId) for stat in stats: # Apply timestamp filtering in Python - stat_timestamp = stat.get("_createdAt", get_utc_timestamp()) + stat_timestamp = stat.createdAt if hasattr(stat, 'createdAt') else get_utc_timestamp() if afterTimestamp is not None and stat_timestamp <= afterTimestamp: continue - chat_stat = ChatStat(**stat) items.append({ "type": "stat", "createdAt": stat_timestamp, - "item": chat_stat.model_dump() if hasattr(chat_stat, 'model_dump') else chat_stat.dict() + "item": stat.model_dump() }) # Sort all items by createdAt timestamp for chronological order diff --git a/modules/routes/routeDataPrompts.py b/modules/routes/routeDataPrompts.py index 939d2271..97da7846 100644 --- a/modules/routes/routeDataPrompts.py +++ b/modules/routes/routeDataPrompts.py @@ -49,7 +49,7 @@ async def create_prompt( managementInterface = interfaceDbComponentObjects.getInterface(currentUser) # Convert Prompt to dict for interface - prompt_data = prompt.model_dump() if hasattr(prompt, "model_dump") else prompt.dict() + prompt_data = prompt.model_dump() # Create prompt newPrompt = managementInterface.createPrompt(prompt_data) @@ -99,7 +99,7 @@ async def update_prompt( if hasattr(promptData, "model_dump"): update_data = promptData.model_dump(exclude={"id"}) else: - update_data = promptData.dict(exclude={"id"}) + update_data = promptData.model_dump(exclude={"id"}) # Update prompt updatedPrompt = managementInterface.updatePrompt(promptId, update_data) diff --git a/modules/routes/routeDataUsers.py b/modules/routes/routeDataUsers.py index e44178a3..578eeeb0 100644 --- a/modules/routes/routeDataUsers.py +++ b/modules/routes/routeDataUsers.py @@ -93,7 +93,7 @@ async def create_user( appInterface = interfaceDbAppObjects.getInterface(currentUser) # Convert User to dict for interface - user_dict = user_data.model_dump() if hasattr(user_data, "model_dump") else user_data.dict() + user_dict = user_data.model_dump() # Create user newUser = appInterface.createUser(user_dict) @@ -120,7 +120,7 @@ async def update_user( ) # Convert User to dict for interface - update_data = userData.model_dump() if hasattr(userData, "model_dump") else userData.dict() + update_data = userData.model_dump() # Update user updatedUser = appInterface.updateUser(userId, update_data) diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py index 5d7a94ac..302d9713 100644 --- a/modules/services/serviceAi/subCoreAi.py +++ b/modules/services/serviceAi/subCoreAi.py @@ -133,6 +133,13 @@ class SubCoreAi: ) response = await self.aiObjects.call(request) result = response.content + + # Emit stats for direct AI call + self.services.workflow.storeWorkflowStat( + self.services.workflow, + response, + f"ai.call.{options.operationType}" + ) # Log AI response for debugging (additional logging for text calls) try: @@ -176,10 +183,20 @@ class SubCoreAi: self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE") logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}") - result = await self.aiObjects.callImage(prompt, imageData, mimeType, options) + response = await self.aiObjects.callImage(prompt, imageData, mimeType, options) + + # Emit stats for image analysis + self.services.workflow.storeWorkflowStat( + self.services.workflow, + response, + f"ai.image.{options.operationType}" + ) # Debug the result - self.services.utils.debugLogToFile(f"Raw AI result type: {type(result)}, value: {repr(result)}", "AI_SERVICE") + self.services.utils.debugLogToFile(f"Raw AI result type: {type(response)}, value: {repr(response)}", "AI_SERVICE") + + # Extract content from response + result = response.content if hasattr(response, 'content') else str(response) # Check if result is valid if not result or (isinstance(result, str) and not result.strip()): @@ -207,7 +224,26 @@ class SubCoreAi: ) -> Dict[str, Any]: """Generate an image using AI using interface.generateImage().""" try: - return await self.aiObjects.generateImage(prompt, size, quality, style, options) + response = await self.aiObjects.generateImage(prompt, size, quality, style, options) + + # Emit stats for image generation + self.services.workflow.storeWorkflowStat( + self.services.workflow, + response, + f"ai.generate.image" + ) + + # Convert response to dict format for backward compatibility + if hasattr(response, 'content'): + return { + "success": True, + "content": response.content, + "modelName": response.modelName, + "priceUsd": response.priceUsd, + "processingTime": response.processingTime + } + else: + return response except Exception as e: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 7608cc1b..1f910e73 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -1,11 +1,14 @@ from typing import Any, Dict, List, Optional, Union import uuid import logging +import time from .subRegistry import ExtractorRegistry, ChunkerRegistry from .subPipeline import runExtraction, poolAndLimit, applyAiIfRequested from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy from modules.datamodels.datamodelChat import ChatDocument +from modules.datamodels.datamodelAi import AiCallResponse +from modules.interfaces.interfaceAiObjects import aiModels logger = logging.getLogger(__name__) @@ -38,6 +41,9 @@ class ExtractionService: logger.info(f"=== DOCUMENT {i}: {doc.fileName} ===") logger.info(f"Initial MIME type: {doc.mimeType}") + # Start timing for this document + startTime = time.time() + # Resolve raw bytes for this document using interface documentBytes = dbInterface.getFileData(doc.fileId) if not documentBytes: @@ -86,6 +92,36 @@ class ExtractionService: logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits") ec = applyAiIfRequested(ec, options) + + # Calculate timing and emit stats + endTime = time.time() + processingTime = endTime - startTime + bytesSent = len(documentBytes) + bytesReceived = sum(len(part.data) if part.data else 0 for part in ec.parts) + + # Emit stats for extraction operation + + # Use internal extraction model for pricing + modelName = "internal_extraction" + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, bytesSent, bytesReceived) + + # Create AiCallResponse with real calculation + aiResponse = AiCallResponse( + content="", # No content for extraction stats needed + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=bytesSent, + bytesReceived=bytesReceived, + errorCount=0 + ) + + self.services.workflow.storeWorkflowStat( + self.services.workflow, + aiResponse, + f"extraction.process.{doc.mimeType}" + ) + results.append(ec) return results diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py index 53e8a848..eb4287af 100644 --- a/modules/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/services/serviceGeneration/mainServiceGeneration.py @@ -1,11 +1,10 @@ import logging import uuid -import json +import time from typing import Any, Dict, List, Optional, Union, Tuple -from datetime import datetime, UTC -import re -from modules.shared.timezoneUtils import get_utc_timestamp from modules.datamodels.datamodelChat import ChatDocument +from modules.datamodels.datamodelAi import AiCallResponse +from modules.interfaces.interfaceAiObjects import aiModels from modules.services.serviceGeneration.subDocumentUtility import ( getFileExtension, getMimeTypeFromExtension, @@ -438,14 +437,80 @@ class GenerationService: ) -> Union[Tuple[str, str], List[Dict[str, Any]]]: """Render report adaptively based on content structure.""" - if isMultiFile and "documents" in extractedContent: - return await self._renderMultiFileReport( - extractedContent, outputFormat, title, userPrompt, aiService + # Start timing for generation + startTime = time.time() + + try: + if isMultiFile and "documents" in extractedContent: + result = await self._renderMultiFileReport( + extractedContent, outputFormat, title, userPrompt, aiService + ) + else: + result = await self._renderSingleFileReport( + extractedContent, outputFormat, title, userPrompt, aiService + ) + + # Calculate timing and emit stats + endTime = time.time() + processingTime = endTime - startTime + + # Calculate bytes (rough estimation) + if isinstance(result, tuple): + content, mime_type = result + bytesReceived = len(content.encode('utf-8')) if isinstance(content, str) else len(content) + elif isinstance(result, list): + bytesReceived = sum(len(str(doc).encode('utf-8')) for doc in result) + else: + bytesReceived = len(str(result).encode('utf-8')) + + # Use internal generation model for pricing + modelName = "internal_generation" + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, 0, bytesReceived) + + aiResponse = AiCallResponse( + content="", # No content for generation stats needed + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=0, # Input is already processed + bytesReceived=bytesReceived, + errorCount=0 ) - else: - return await self._renderSingleFileReport( - extractedContent, outputFormat, title, userPrompt, aiService + + self.services.workflow.storeWorkflowStat( + self.services.workflow, + aiResponse, + f"generation.render.{outputFormat}" ) + + return result + + except Exception as e: + # Calculate timing for error case + endTime = time.time() + processingTime = endTime - startTime + + # Use internal generation model for pricing + modelName = "internal_generation" + priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, 0, 0) + + aiResponse = AiCallResponse( + content="", # No content for generation stats needed + modelName=modelName, + priceUsd=priceUsd, + processingTime=processingTime, + bytesSent=0, + bytesReceived=0, + errorCount=1 + ) + + self.services.workflow.storeWorkflowStat( + self.services.workflow, + aiResponse, + f"generation.render.{outputFormat}" + ) + + raise async def _renderMultiFileReport( self, diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index e2f03dbb..0b2851e0 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -482,14 +482,6 @@ class WorkflowService: logger.error(f"Error updating workflow: {str(e)}") raise - def updateWorkflowStats(self, workflowId: str, **kwargs): - """Update workflow statistics by delegating to the chat interface""" - try: - return self.interfaceDbChat.updateWorkflowStats(workflowId, **kwargs) - except Exception as e: - logger.error(f"Error updating workflow stats: {str(e)}") - raise - def getWorkflow(self, workflowId: str): """Get workflow by ID by delegating to the chat interface""" try: @@ -549,41 +541,34 @@ class WorkflowService: workflow.logs.append(chatLog) return chatLog - def storeWorkflowStat(self, workflow: Any, statData: Dict[str, Any]) -> Any: - """Persist workflow-level ChatStat and set/replace on in-memory workflow.""" - statData = dict(statData or {}) - statData["workflowId"] = workflow.id - chatInterface = self.interfaceDbChat - # Reuse updateWorkflowStats for incremental or create raw record when needed + def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> Any: + """Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list.""" try: - self.updateWorkflowStats(workflow.id, **{ - 'bytesSent': statData.get('bytesSent', 0), - 'bytesReceived': statData.get('bytesReceived', 0), - 'tokenCount': statData.get('tokenCount', 0) - }) - except Exception: - pass - stat = chatInterface.getWorkflowStats(workflow.id) - workflow.stats = stat - return stat - - def storeMessageStat(self, workflow: Any, messageId: str, statData: Dict[str, Any]) -> Any: - """Persist message-level ChatStat and bind to the message in-memory.""" - statData = dict(statData or {}) - statData["workflowId"] = workflow.id - statData["messageId"] = messageId - # Persist as ChatStat row - try: - self.interfaceDbChat.db.recordCreate(ChatStat, statData) + # Create ChatStat from AiCallResponse data + statData = { + "workflowId": workflow.id, + "process": process, + "engine": aiResponse.modelName, + "priceUsd": aiResponse.priceUsd, + "processingTime": aiResponse.processingTime, + "bytesSent": aiResponse.bytesSent, + "bytesReceived": aiResponse.bytesReceived, + "errorCount": aiResponse.errorCount + } + + # Create the stat record in the database + stat = self.interfaceDbChat.createStat(statData) + + # Append to workflow stats list in memory + if not hasattr(workflow, 'stats') or workflow.stats is None: + workflow.stats = [] + workflow.stats.append(stat) + + return stat except Exception as e: - logger.error(f"Failed to persist message stat: {e}") + logger.error(f"Failed to store workflow stat: {e}") raise - stat = self.interfaceDbChat.getMessageStats(messageId) - for m in workflow.messages or []: - if getattr(m, 'id', None) == messageId: - m.stats = stat - break - return stat + def updateMessage(self, messageId: str, messageData: Dict[str, Any]): """Update message by delegating to the chat interface""" diff --git a/modules/workflows/methods/_EXCLUDED_methodDocument.py b/modules/workflows/methods/_EXCLUDED_methodDocument.py deleted file mode 100644 index d31631ce..00000000 --- a/modules/workflows/methods/_EXCLUDED_methodDocument.py +++ /dev/null @@ -1,252 +0,0 @@ -""" -Document processing method module. -Handles document operations using the document service. -""" - -import logging -import os -from typing import Dict, Any, List, Optional -from datetime import datetime, UTC - -from modules.workflows.methods.methodBase import MethodBase, action -from modules.datamodels.datamodelChat import ActionResult, ActionDocument -from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority - -logger = logging.getLogger(__name__) - -class MethodDocument(MethodBase): - """Document method implementation for document operations""" - - def __init__(self, services): - """Initialize the document method""" - super().__init__(services) - self.name = "document" - self.description = "Handle document operations like extraction and analysis" - - def _format_timestamp_for_filename(self) -> str: - """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" - return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - - @action - async def extract(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Extract and analyze content from existing documents using AI. - - Input requirements: documentList (required); prompt (required). - - Output format: Plain text per source document (.txt by default). - - Parameters: - - documentList (list, required): Document reference(s) to extract from. - - prompt (str, required): Instruction describing what to extract. - - operationType (str, optional): extract_content | analyze_document | summarize_content. Default: extract_content. - - processDocumentsIndividually (bool, optional): Process each document separately. Default: True. - - chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True. - - outputMimeType (str, optional): MIME type for output file. Options: "text/plain" (default), "application/json", "text/csv", "text/html". Default: "text/plain". - """ - try: - documentList = parameters.get("documentList") - if isinstance(documentList, str): - documentList = [documentList] - prompt = parameters.get("prompt") - operationType = parameters.get("operationType", "extract_content") - processDocumentsIndividually = parameters.get("processDocumentsIndividually", True) - chunkAllowed = parameters.get("chunkAllowed", True) - outputMimeType = parameters.get("outputMimeType", "text/plain") - - if not documentList: - return ActionResult.isFailure( - error="Document list reference is required" - ) - - if not prompt: - return ActionResult.isFailure( - error="Prompt is required" - ) - - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) - if not chatDocuments: - return ActionResult.isFailure( - error="No documents found for the provided reference" - ) - - # Use enhanced AI service with integrated extraction - try: - # Build AI call options - ai_options = AiCallOptions( - operationType=operationType, - processDocumentsIndividually=processDocumentsIndividually, - compressContext=not chunkAllowed - ) - - # Add format instructions to prompt based on MIME type - enhanced_prompt = prompt - mime_type_mapping = { - "text/plain": (".txt", "Plain text format"), - "application/json": (".json", "Structured JSON format"), - "text/csv": (".csv", "Table format"), - "text/html": (".html", "HTML format") - } - extension, description = mime_type_mapping.get(outputMimeType, (".txt", "Plain text format")) - enhanced_prompt += f"\n\nPlease format the output as {extension} ({outputMimeType}): {description}" - - # Use enhanced AI service for extraction - ai_response = await self.services.ai.callAi( - prompt=enhanced_prompt, - documents=chatDocuments, - options=ai_options - ) - - logger.info(f"AI extraction completed: {len(ai_response)} characters") - - except Exception as e: - logger.error(f"AI extraction failed: {str(e)}") - ai_response = "" - - if not ai_response or ai_response.strip() == "": - return ActionResult.isFailure( - error="No content could be extracted from any documents" - ) - - # Process each document individually with extracted content - action_documents = [] - - for i, chatDocument in enumerate(chatDocuments): - # Use the AI response directly - it already contains processed content - final_content = ai_response - - # Determine output format based on MIME type - mime_type_mapping = { - "text/plain": ".txt", - "application/json": ".json", - "text/csv": ".csv", - "text/html": ".html" - } - final_extension = mime_type_mapping.get(outputMimeType, ".txt") - final_mime_type = outputMimeType - - # Create meaningful output fileName with workflow context - original_fileName = chatDocument.fileName - base_name = original_fileName.rsplit('.', 1)[0] if '.' in original_fileName else original_fileName - extension = final_extension.lstrip('.') # Remove leading dot for meaningful naming - output_fileName = self._generateMeaningfulFileName( - base_name=f"{base_name}_extracted", - extension=extension, - action_name="extract" - ) - - logger.info(f"Created output document: {output_fileName} with {len(final_content)} characters") - - # Create proper ActionDocument object - action_documents.append(ActionDocument( - documentName=output_fileName, - documentData=final_content, - mimeType=final_mime_type - )) - - return ActionResult.isSuccess( - documents=action_documents - ) - except Exception as e: - logger.error(f"Error extracting content: {str(e)}") - return ActionResult.isFailure( - error=str(e) - ) - - @action - async def generate(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Generate formatted documents and reports from source documents. - - Input requirements: documentList (required); prompt (required); optional title and outputFormat. - - Any output format, e.g.: html | pdf | docx | txt | md | json | csv | xlsx - - Parameters: - - documentList (list, required): Document reference(s) to include as context. - - prompt (str, required): Instruction describing the desired document/report. - - title (str, optional): Title for the generated document. Default: "Summary Report". - - outputFormat (str, optional): html | pdf | docx | txt | md | json | csv | xlsx. Default: html. - - operationType (str, optional): generate_report | analyze_documents. Default: generate_report. - - processDocumentsIndividually (bool, optional): Process per document. Default: True. - - chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True. - """ - try: - documentList = parameters.get("documentList") - if isinstance(documentList, str): - documentList = [documentList] - prompt = parameters.get("prompt") - title = parameters.get("title", "Summary Report") - outputFormat = parameters.get("outputFormat", "html") - operationType = parameters.get("operationType", "generate_report") - processDocumentsIndividually = parameters.get("processDocumentsIndividually", True) - chunkAllowed = parameters.get("chunkAllowed", True) - - if not documentList: - return ActionResult.isFailure( - error="Document list reference is required" - ) - - if not prompt: - return ActionResult.isFailure( - error="Prompt is required to specify what kind of report to generate" - ) - - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) - logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation") - - if not chatDocuments: - return ActionResult.isFailure( - error="No documents found for the provided reference" - ) - - # Use enhanced AI service with document generation - try: - # Build AI call options - ai_options = AiCallOptions( - operationType=operationType, - processDocumentsIndividually=processDocumentsIndividually, - compressContext=not chunkAllowed - ) - - # Use enhanced AI service with document generation - result = await self.services.ai.callAi( - prompt=prompt, - documents=chatDocuments, - options=ai_options, - outputFormat=outputFormat, - title=title - ) - - if isinstance(result, dict) and result.get("success"): - # Extract document information from result - documents = result.get("documents", []) - if documents: - # Convert to ActionDocument format - action_documents = [] - for doc in documents: - action_documents.append(ActionDocument( - documentName=doc["documentName"], - documentData=doc["documentData"], - mimeType=doc["mimeType"] - )) - - logger.info(f"Generated {outputFormat.upper()} report: {len(action_documents)} documents") - return ActionResult.isSuccess(documents=action_documents) - else: - return ActionResult.isFailure(error="No documents generated") - else: - error_msg = result.get("error", "Unknown error") if isinstance(result, dict) else "AI generation failed" - return ActionResult.isFailure(error=error_msg) - - except Exception as e: - logger.error(f"AI generation failed: {str(e)}") - return ActionResult.isFailure(error=str(e)) - - except Exception as e: - logger.error(f"Error generating report: {str(e)}") - return ActionResult.isFailure( - error=str(e) - ) - - - diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 5c04ba0b..b47bfb97 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -93,20 +93,11 @@ class WorkflowManager: "messageIds": [], "workflowMode": workflowMode, "maxSteps": 5 if workflowMode == "React" else 1, # Set maxSteps for React mode - "stats": { - "processingTime": None, - "tokenCount": None, - "bytesSent": None, - "bytesReceived": None, - "successRate": None, - "errorCount": None - } } workflow = self.services.workflow.createWorkflow(workflowData) logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") - self.services.workflow.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) self.services.currentWorkflow = workflow