From 16db2d91c64312744bba383403b0e6954c53b92f Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Tue, 3 Mar 2026 18:57:20 +0100 Subject: [PATCH] fixed critical code issues --- modules/workflows/automation/mainWorkflow.py | 27 +-- .../automation/subAutomationSchedule.py | 2 +- .../automation/subAutomationTemplates.py | 2 +- .../automation/subAutomationUtils.py | 70 +++---- .../methods/methodAi/actions/generateCode.py | 12 +- .../methodAi/actions/generateDocument.py | 14 +- .../methods/methodAi/actions/process.py | 13 +- .../methods/methodAi/actions/webResearch.py | 11 +- modules/workflows/methods/methodBase.py | 14 +- .../methodChatbot/actions/queryDatabase.py | 19 +- .../methodContext/actions/extractContent.py | 10 +- .../methodContext/actions/getDocumentIndex.py | 15 +- .../methodContext/actions/neutralizeData.py | 10 +- .../methodOutlook/actions/readEmails.py | 7 +- .../methodOutlook/actions/searchEmails.py | 4 +- .../methodOutlook/actions/sendDraftEmail.py | 10 + .../methodOutlook/helpers/connection.py | 20 +- .../methodOutlook/helpers/emailProcessing.py | 16 +- .../actions/uploadDocument.py | 7 +- .../methodSharepoint/helpers/apiClient.py | 92 ++++----- .../adaptive/adaptiveLearningEngine.py | 32 ++-- .../processing/adaptive/contentValidator.py | 55 +++--- .../processing/adaptive/learningEngine.py | 5 + .../processing/core/actionExecutor.py | 11 +- .../processing/core/messageCreator.py | 57 ++---- .../workflows/processing/core/taskPlanner.py | 75 +------- .../workflows/processing/core/validator.py | 33 ++-- .../processing/modes/modeAutomation.py | 86 +-------- .../workflows/processing/modes/modeBase.py | 78 +++++++- .../workflows/processing/modes/modeDynamic.py | 178 +++--------------- .../processing/shared/executionState.py | 42 +---- .../processing/shared/methodDiscovery.py | 116 +++--------- .../processing/shared/placeholderFactory.py | 109 ++++------- .../shared/promptGenerationActionsDynamic.py | 19 +- .../workflows/processing/workflowProcessor.py | 37 ++-- modules/workflows/workflowManager.py | 64 ++----- 36 files changed, 494 insertions(+), 878 deletions(-) diff --git a/modules/workflows/automation/mainWorkflow.py b/modules/workflows/automation/mainWorkflow.py index f1b91939..1fc4b0cf 100644 --- a/modules/workflows/automation/mainWorkflow.py +++ b/modules/workflows/automation/mainWorkflow.py @@ -56,12 +56,12 @@ async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode logger.error(f"Error starting chat: {str(e)}") raise -async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> ChatWorkflow: +async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow: """Stops a running chat.""" try: services = getServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) - if featureInstanceId: - services.featureCode = 'chatplayground' + if featureCode: + services.featureCode = featureCode workflowManager = WorkflowManager(services) return await workflowManager.workflowStop(workflowId) except Exception as e: @@ -101,8 +101,11 @@ async def executeAutomation(automationId: str, automation, creatorUser: User, se logger.debug(f"Automation {automationId} restricted to providers: {automation.allowedProviders}") # Context comes EXCLUSIVELY from the automation definition - automationMandateId = str(automation.mandateId) - automationFeatureInstanceId = str(automation.featureInstanceId) + automationMandateId = str(automation.mandateId) if automation.mandateId is not None else None + automationFeatureInstanceId = str(automation.featureInstanceId) if automation.featureInstanceId is not None else None + + if not automationMandateId or not automationFeatureInstanceId: + raise ValueError(f"Automation {automationId} missing mandateId or featureInstanceId") logger.info(f"Executing automation {automationId} as user {creatorUser.id} with mandateId={automationMandateId}, featureInstanceId={automationFeatureInstanceId}") @@ -118,7 +121,7 @@ async def executeAutomation(automationId: str, automation, creatorUser: User, se logger.error(f"Placeholders: {placeholders}") logger.error(f"Generated planJson (first 1000 chars): {planJson[:1000]}") logger.error(f"Error position: line {e.lineno}, column {e.colno}, char {e.pos}") - if e.pos: + if e.pos is not None: start = max(0, e.pos - 100) end = min(len(planJson), e.pos + 100) logger.error(f"Context around error: ...{planJson[start:end]}...") @@ -233,20 +236,10 @@ def syncAutomationEvents(services, eventUser) -> Dict[str, Any]: cronKwargs = parseScheduleToCron(schedule) if isActive: - # Remove existing event if present (handles schedule changes) - if currentEventId: - try: - eventManager.remove(currentEventId) - except Exception as e: - logger.warning(f"Error removing old event {currentEventId}: {str(e)}") - - # Register new event newEventId = f"automation.{automationId}" - - # Create event handler function handler = createAutomationEventHandler(automationId, eventUser) - # Register cron job + # Register with replaceExisting=True (atomically replaces old event) eventManager.registerCron( jobId=newEventId, func=handler, diff --git a/modules/workflows/automation/subAutomationSchedule.py b/modules/workflows/automation/subAutomationSchedule.py index 40638461..9db1f3fa 100644 --- a/modules/workflows/automation/subAutomationSchedule.py +++ b/modules/workflows/automation/subAutomationSchedule.py @@ -48,7 +48,7 @@ def start(eventUser) -> bool: except Exception as e: logger.error(f"Automation: Error setting up events on startup: {str(e)}") - # Don't fail startup if automation sync fails + return False return True diff --git a/modules/workflows/automation/subAutomationTemplates.py b/modules/workflows/automation/subAutomationTemplates.py index 95c1eb77..eb131f0a 100644 --- a/modules/workflows/automation/subAutomationTemplates.py +++ b/modules/workflows/automation/subAutomationTemplates.py @@ -6,7 +6,7 @@ Automation templates for workflow definitions. Contains predefined workflow templates that can be used to create automation definitions. """ -from typing import Dict, Any, List +from typing import Dict, Any # Automation templates structure AUTOMATION_TEMPLATES: Dict[str, Any] = { diff --git a/modules/workflows/automation/subAutomationUtils.py b/modules/workflows/automation/subAutomationUtils.py index 97d28719..bdac6efb 100644 --- a/modules/workflows/automation/subAutomationUtils.py +++ b/modules/workflows/automation/subAutomationUtils.py @@ -69,50 +69,42 @@ def replacePlaceholders(template: str, placeholders: Dict[str, str]) -> str: result = result.replace(arrayPattern, arrayValue) continue # Skip the regular replacement below - # Regular replacement - check if in quoted context - patternStart = result.find(pattern) - isQuoted = False - if patternStart > 0: - charBefore = result[patternStart - 1] if patternStart > 0 else None - patternEnd = patternStart + len(pattern) - charAfter = result[patternEnd] if patternEnd < len(result) else None - if charBefore == '"' and charAfter == '"': - isQuoted = True - - # Handle different value types - if isinstance(value, (list, dict)): - # Python list/dict - convert to JSON - replacement = json.dumps(value) - elif isinstance(value, str): - # String value - check if it's a JSON string representing list/dict - try: - parsed = json.loads(value) - if isinstance(parsed, (list, dict)): - # It's a JSON string of a list/dict - if isQuoted: - # In quoted context, escape the JSON string - escaped = json.dumps(value) - replacement = escaped[1:-1] # Remove outer quotes + # Replace occurrences one-by-one to handle mixed contexts + while pattern in result: + patternStart = result.find(pattern) + isQuoted = False + if patternStart > 0: + charBefore = result[patternStart - 1] + patternEnd = patternStart + len(pattern) + charAfter = result[patternEnd] if patternEnd < len(result) else None + if charBefore == '"' and charAfter == '"': + isQuoted = True + + if isinstance(value, (list, dict)): + replacement = json.dumps(value) + elif isinstance(value, str): + try: + parsed = json.loads(value) + if isinstance(parsed, (list, dict)): + if isQuoted: + escaped = json.dumps(value) + replacement = escaped[1:-1] + else: + replacement = value else: - # In unquoted context, use JSON directly - replacement = value - else: - # It's a JSON string of a primitive + if isQuoted: + escaped = json.dumps(value) + replacement = escaped[1:-1] + else: + replacement = value + except (json.JSONDecodeError, ValueError): if isQuoted: escaped = json.dumps(value) replacement = escaped[1:-1] else: replacement = value - except (json.JSONDecodeError, ValueError): - # Not valid JSON - treat as plain string - if isQuoted: - escaped = json.dumps(value) - replacement = escaped[1:-1] - else: - replacement = value - else: - # Numbers, booleans, None - convert to string - replacement = str(value) - result = result.replace(pattern, replacement) + else: + replacement = str(value) + result = result[:patternStart] + replacement + result[patternStart + len(pattern):] return result diff --git a/modules/workflows/methods/methodAi/actions/generateCode.py b/modules/workflows/methods/methodAi/actions/generateCode.py index 4f9bbd21..c616006b 100644 --- a/modules/workflows/methods/methodAi/actions/generateCode.py +++ b/modules/workflows/methods/methodAi/actions/generateCode.py @@ -74,7 +74,11 @@ async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult: documentName=docData.documentName, documentData=docData.documentData, mimeType=docData.mimeType, - sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None + sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None, + validationMetadata={ + "actionType": "ai.generateCode", + "resultType": resultType, + } )) # If no documents but content exists, create a document from content @@ -112,7 +116,11 @@ async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult: documents.append(ActionDocument( documentName=docName, documentData=aiResponse.content.encode('utf-8') if isinstance(aiResponse.content, str) else aiResponse.content, - mimeType=mimeType + mimeType=mimeType, + validationMetadata={ + "actionType": "ai.generateCode", + "resultType": resultType, + } )) return ActionResult.isSuccess(documents=documents) diff --git a/modules/workflows/methods/methodAi/actions/generateDocument.py b/modules/workflows/methods/methodAi/actions/generateDocument.py index 65e95a32..8bb33f9d 100644 --- a/modules/workflows/methods/methodAi/actions/generateDocument.py +++ b/modules/workflows/methods/methodAi/actions/generateDocument.py @@ -78,7 +78,12 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult: documentName=docData.documentName, documentData=docData.documentData, mimeType=docData.mimeType, - sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None + sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None, + validationMetadata={ + "actionType": "ai.generateDocument", + "documentType": documentType, + "resultType": resultType, + } )) # If no documents but content exists, create a document from content @@ -112,7 +117,12 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult: documents.append(ActionDocument( documentName=docName, documentData=aiResponse.content.encode('utf-8') if isinstance(aiResponse.content, str) else aiResponse.content, - mimeType=mimeType + mimeType=mimeType, + validationMetadata={ + "actionType": "ai.generateDocument", + "documentType": documentType, + "resultType": resultType, + } )) return ActionResult.isSuccess(documents=documents) diff --git a/modules/workflows/methods/methodAi/actions/process.py b/modules/workflows/methods/methodAi/actions/process.py index 752fe7f6..b4157f13 100644 --- a/modules/workflows/methods/methodAi/actions/process.py +++ b/modules/workflows/methods/methodAi/actions/process.py @@ -12,8 +12,8 @@ from modules.datamodels.datamodelExtraction import ContentPart logger = logging.getLogger(__name__) async def process(self, parameters: Dict[str, Any]) -> ActionResult: + operationId = None try: - # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"ai_process_{workflowId}_{int(time.time())}" @@ -83,7 +83,8 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: output_format = None logger.debug("resultType not provided - formats will be determined from prompt by AI") - output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available + mimeMap = {"txt": "text/plain", "json": "application/json", "html": "text/html", "md": "text/markdown", "csv": "text/csv", "xml": "application/xml"} + output_mime_type = mimeMap.get(normalized_result_type, "text/plain") if normalized_result_type else "text/plain" # Phase 7.3: Pass both documentList and contentParts to AI service # (Extraction logic removed - handled by AI service) @@ -264,11 +265,11 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: except Exception as e: logger.error(f"Error in AI processing: {str(e)}") - # Complete progress tracking with failure try: - self.services.chat.progressLogFinish(operationId, False) - except: - pass # Don't fail on progress logging errors + if operationId: + self.services.chat.progressLogFinish(operationId, False) + except Exception: + pass return ActionResult.isFailure( error=str(e) diff --git a/modules/workflows/methods/methodAi/actions/webResearch.py b/modules/workflows/methods/methodAi/actions/webResearch.py index 62b43bce..d59a26f9 100644 --- a/modules/workflows/methods/methodAi/actions/webResearch.py +++ b/modules/workflows/methods/methodAi/actions/webResearch.py @@ -4,18 +4,19 @@ import logging import time import re +import json from typing import Dict, Any from modules.datamodels.datamodelChat import ActionResult, ActionDocument logger = logging.getLogger(__name__) async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: + operationId = None try: prompt = parameters.get("prompt") if not prompt: return ActionResult.isFailure(error="Research prompt is required") - # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"web_research_{workflowId}_{int(time.time())}" @@ -78,9 +79,10 @@ async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: "researchDepth": parameters.get("researchDepth", "general"), "resultFormat": "json" } + documentData = json.dumps(result, ensure_ascii=False) if isinstance(result, dict) else result actionDocument = ActionDocument( documentName=meaningfulName, - documentData=result, + documentData=documentData, mimeType="application/json", validationMetadata=validationMetadata ) @@ -90,8 +92,9 @@ async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: except Exception as e: logger.error(f"Error in web research: {str(e)}") try: - self.services.chat.progressLogFinish(operationId, False) - except: + if operationId: + self.services.chat.progressLogFinish(operationId, False) + except Exception: pass return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py index 173023f1..1a81c3eb 100644 --- a/modules/workflows/methods/methodBase.py +++ b/modules/workflows/methods/methodBase.py @@ -1,11 +1,10 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. -from typing import Dict, List, Optional, Any, Literal +from typing import Dict, List, Optional, Any from datetime import datetime, UTC import logging from functools import wraps -import inspect from modules.datamodels.datamodelWorkflowActions import WorkflowActionDefinition, WorkflowActionParameter from modules.datamodels.datamodelRbac import AccessRuleContext @@ -258,9 +257,13 @@ class MethodBase: raise ValueError(f"Expected dict for type '{expectedType}', got {type(value).__name__}") return value - # Handle simple types + # Handle simple types (bool must be checked before int since bool is subclass of int) if expectedType in typeMap: expectedTypeClass = typeMap[expectedType] + if expectedType == 'int' and isinstance(value, bool): + raise ValueError(f"Expected int, got bool: {value}") + if expectedType == 'bool' and isinstance(value, int) and not isinstance(value, bool): + return bool(value) if not isinstance(value, expectedTypeClass): try: return expectedTypeClass(value) @@ -290,10 +293,11 @@ class MethodBase: def getActionSignature(self, actionName: str) -> str: """Get formatted action signature for AI prompt generation (detailed version)""" - if actionName not in self.actions: + allActions = self.actions + if actionName not in allActions: return "" - action = self.actions[actionName] + action = allActions[actionName] paramList = [] # Extract detailed parameter information from docstring diff --git a/modules/workflows/methods/methodChatbot/actions/queryDatabase.py b/modules/workflows/methods/methodChatbot/actions/queryDatabase.py index ff7e896f..8622a5a9 100644 --- a/modules/workflows/methods/methodChatbot/actions/queryDatabase.py +++ b/modules/workflows/methods/methodChatbot/actions/queryDatabase.py @@ -89,14 +89,26 @@ async def queryDatabase(self, parameters: Dict[str, Any]) -> ActionResult: # Update progress self.services.chat.progressLogUpdate(operationId, 0.3, "Validating query") + # Validate: only SELECT queries allowed + sqlNormalized = sqlQuery.strip().upper() + if not sqlNormalized.startswith("SELECT"): + return ActionResult.isFailure(error="Only SELECT queries are allowed") + forbiddenKeywords = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "TRUNCATE", "EXEC", "EXECUTE"] + for kw in forbiddenKeywords: + if f" {kw} " in f" {sqlNormalized} " or sqlNormalized.startswith(f"{kw} "): + return ActionResult.isFailure(error=f"Forbidden SQL keyword detected: {kw}") + # Initialize connector connector = PreprocessorConnector() # Update progress self.services.chat.progressLogUpdate(operationId, 0.5, "Executing query") - # Execute query - result = await connector.executeQuery(sqlQuery) + try: + result = await connector.executeQuery(sqlQuery) + except Exception: + await connector.close() + raise # Update progress self.services.chat.progressLogUpdate(operationId, 0.8, "Formatting results") @@ -134,10 +146,9 @@ async def queryDatabase(self, parameters: Dict[str, Any]) -> ActionResult: except Exception as e: logger.error(f"Error executing database query: {str(e)}") - # Complete progress tracking with failure try: self.services.chat.progressLogFinish(operationId, False) - except: + except Exception: pass return ActionResult.isFailure( diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index 5b90ce13..466165ad 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -11,8 +11,8 @@ from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrat logger = logging.getLogger(__name__) async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: + operationId = None try: - # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"context_extract_{workflowId}_{int(time.time())}" @@ -208,11 +208,11 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: except Exception as e: logger.error(f"Error in content extraction: {str(e)}") - # Complete progress tracking with failure try: - self.services.chat.progressLogFinish(operationId, False) - except: - pass # Don't fail on progress logging errors + if operationId: + self.services.chat.progressLogFinish(operationId, False) + except Exception: + pass return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodContext/actions/getDocumentIndex.py b/modules/workflows/methods/methodContext/actions/getDocumentIndex.py index 9991285b..b2822e0d 100644 --- a/modules/workflows/methods/methodContext/actions/getDocumentIndex.py +++ b/modules/workflows/methods/methodContext/actions/getDocumentIndex.py @@ -22,14 +22,13 @@ async def getDocumentIndex(self, parameters: Dict[str, Any]) -> ActionResult: documentsIndex = self.services.chat.getAvailableDocuments(workflow) if not documentsIndex or documentsIndex == "No documents available" or documentsIndex == "NO DOCUMENTS AVAILABLE - This workflow has no documents to process.": - # Return empty index structure + indexData = { + "workflowId": getattr(workflow, 'id', 'unknown'), + "totalDocuments": 0, + "rounds": [], + "documentReferences": [] + } if resultType == "json": - indexData = { - "workflowId": getattr(workflow, 'id', 'unknown'), - "totalDocuments": 0, - "rounds": [], - "documentReferences": [] - } indexContent = json.dumps(indexData, indent=2, ensure_ascii=False) else: indexContent = "Document Index\n==============\n\nNo documents available in this workflow.\n" @@ -64,7 +63,7 @@ async def getDocumentIndex(self, parameters: Dict[str, Any]) -> ActionResult: document = ActionDocument( documentName=filename, documentData=indexContent, - mimeType="application/json" if resultType == "json" else "text/plain", + mimeType="application/json" if resultType == "json" else ("text/markdown" if resultType == "md" else "text/plain"), validationMetadata=validationMetadata ) diff --git a/modules/workflows/methods/methodContext/actions/neutralizeData.py b/modules/workflows/methods/methodContext/actions/neutralizeData.py index 8e3b7185..d5ec045b 100644 --- a/modules/workflows/methods/methodContext/actions/neutralizeData.py +++ b/modules/workflows/methods/methodContext/actions/neutralizeData.py @@ -11,8 +11,8 @@ from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart logger = logging.getLogger(__name__) async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: + operationId = None try: - # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"context_neutralize_{workflowId}_{int(time.time())}" @@ -228,10 +228,10 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: except Exception as e: logger.error(f"Error in data neutralization: {str(e)}") - # Complete progress tracking with failure try: - self.services.chat.progressLogFinish(operationId, False) - except: - pass # Don't fail on progress logging errors + if operationId: + self.services.chat.progressLogFinish(operationId, False) + except Exception: + pass return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodOutlook/actions/readEmails.py b/modules/workflows/methods/methodOutlook/actions/readEmails.py index 2d325d9f..f388f818 100644 --- a/modules/workflows/methods/methodOutlook/actions/readEmails.py +++ b/modules/workflows/methods/methodOutlook/actions/readEmails.py @@ -29,7 +29,7 @@ async def readEmails(self, parameters: Dict[str, Any]) -> ActionResult: connectionReference = parameters.get("connectionReference") folder = parameters.get("folder", "Inbox") - limit = parameters.get("limit", 10) + limit = parameters.get("limit", 1000) filter = parameters.get("filter") outputMimeType = parameters.get("outputMimeType", "application/json") @@ -110,7 +110,6 @@ async def readEmails(self, parameters: Dict[str, Any]) -> ActionResult: if response.status_code != 200: logger.error(f"Graph API error: {response.status_code} - {response.text}") logger.error(f"Request URL: {response.url}") - logger.error(f"Request headers: {headers}") logger.error(f"Request params: {params}") response.raise_for_status() @@ -217,8 +216,8 @@ async def readEmails(self, parameters: Dict[str, Any]) -> ActionResult: if operationId: try: self.services.chat.progressLogFinish(operationId, False) - except: - pass # Don't fail on progress logging errors + except Exception: + pass return ActionResult.isFailure( error=str(e) ) diff --git a/modules/workflows/methods/methodOutlook/actions/searchEmails.py b/modules/workflows/methods/methodOutlook/actions/searchEmails.py index f8831d59..c7f839b6 100644 --- a/modules/workflows/methods/methodOutlook/actions/searchEmails.py +++ b/modules/workflows/methods/methodOutlook/actions/searchEmails.py @@ -93,7 +93,7 @@ async def searchEmails(self, parameters: Dict[str, Any]) -> ActionResult: try: error_data = response.json() logger.error(f"Microsoft Graph API error: {response.status_code} - {error_data}") - except: + except Exception: logger.error(f"Microsoft Graph API error: {response.status_code} - {response.text}") # Check for specific error types and provide helpful messages @@ -111,8 +111,6 @@ async def searchEmails(self, parameters: Dict[str, Any]) -> ActionResult: raise Exception(f"Microsoft Graph API returned {response.status_code}: {response.text}") - response.raise_for_status() - search_data = response.json() emails = search_data.get("value", []) diff --git a/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py b/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py index 15c35f44..1c0c80d4 100644 --- a/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py +++ b/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py @@ -293,8 +293,18 @@ async def sendDraftEmail(self, parameters: Dict[str, Any]) -> ActionResult: except ImportError: logger.error("requests module not available") + if operationId: + try: + self.services.chat.progressLogFinish(operationId, False) + except Exception: + pass return ActionResult.isFailure(error="requests module not available") except Exception as e: logger.error(f"Error in sendDraftEmail: {str(e)}") + if operationId: + try: + self.services.chat.progressLogFinish(operationId, False) + except Exception: + pass return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodOutlook/helpers/connection.py b/modules/workflows/methods/methodOutlook/helpers/connection.py index 12621fd3..cd42b7f5 100644 --- a/modules/workflows/methods/methodOutlook/helpers/connection.py +++ b/modules/workflows/methods/methodOutlook/helpers/connection.py @@ -40,25 +40,21 @@ class ConnectionHelper: logger.debug(f"Found connection: {userConnection.id}, status: {userConnection.status.value}, authority: {userConnection.authority.value}") - # Get a fresh token for this connection - token = self.services.chat.getFreshConnectionToken(userConnection.id) - if not token: - logger.error(f"Fresh token not found for connection: {userConnection.id}") - logger.debug(f"Connection details: {userConnection}") - return None - - logger.debug(f"Fresh token retrieved for connection {userConnection.id}") - - # Check if connection is active + # Check status BEFORE fetching token (avoids unnecessary network call) if userConnection.status.value != "active": logger.error(f"Connection is not active: {userConnection.id}, status: {userConnection.status.value}") return None + token = self.services.chat.getFreshConnectionToken(userConnection.id) + if not token: + logger.error(f"Fresh token not found for connection: {userConnection.id}") + return None + + logger.debug(f"Fresh token retrieved for connection {userConnection.id}") + return { "id": userConnection.id, "accessToken": token.tokenAccess, - "refreshToken": token.tokenRefresh, - "scopes": ["Mail.ReadWrite", "Mail.Send", "Mail.ReadWrite.Shared", "User.Read"] # Valid Microsoft Graph API scopes } except Exception as e: logger.error(f"Error getting Microsoft connection: {str(e)}") diff --git a/modules/workflows/methods/methodOutlook/helpers/emailProcessing.py b/modules/workflows/methods/methodOutlook/helpers/emailProcessing.py index 88644a33..f1736221 100644 --- a/modules/workflows/methods/methodOutlook/helpers/emailProcessing.py +++ b/modules/workflows/methods/methodOutlook/helpers/emailProcessing.py @@ -57,10 +57,10 @@ class EmailProcessingHelper: # This is an advanced search query, return as-is return clean_query - # For basic text search, ensure it's safe for contains() filter - # Remove any characters that might break the OData filter syntax - # Remove or escape characters that could break OData filter syntax - safe_query = re.sub(r'[\\\'"]', '', clean_query) + # Escape single quotes for OData safety (double them) + safe_query = clean_query.replace("'", "''") + # Remove backslashes and double quotes + safe_query = re.sub(r'[\\"]', '', safe_query) return safe_query @@ -173,12 +173,14 @@ class EmailProcessingHelper: # Handle email address filters (only if it's NOT a search query) if '@' in filter_text and '.' in filter_text and ' ' not in filter_text and not filter_text.startswith('from:'): - return {"$filter": f"from/fromAddress/address eq '{filter_text}'"} + safeEmail = filter_text.replace("'", "''") + return {"$filter": f"from/fromAddress/address eq '{safeEmail}'"} # Handle OData filter conditions (contains 'eq', 'ne', 'gt', 'lt', etc.) if any(op in filter_text.lower() for op in [' eq ', ' ne ', ' gt ', ' lt ', ' ge ', ' le ', ' and ', ' or ']): return {"$filter": filter_text} - # Handle text content - search in subject - return {"$filter": f"contains(subject,'{filter_text}')"} + # Handle text content - search in subject (escape single quotes) + safeText = filter_text.replace("'", "''") + return {"$filter": f"contains(subject,'{safeText}')"} diff --git a/modules/workflows/methods/methodSharepoint/actions/uploadDocument.py b/modules/workflows/methods/methodSharepoint/actions/uploadDocument.py index e9361853..c68133d5 100644 --- a/modules/workflows/methods/methodSharepoint/actions/uploadDocument.py +++ b/modules/workflows/methods/methodSharepoint/actions/uploadDocument.py @@ -240,11 +240,12 @@ async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult: } successfulUploads = len([r for r in uploadResults if r.get("uploadStatus") == "success"]) + overallSuccess = successfulUploads > 0 self.services.chat.progressLogUpdate(operationId, 0.9, f"Uploaded {successfulUploads}/{len(uploadResults)} file(s)") - self.services.chat.progressLogFinish(operationId, successfulUploads > 0) + self.services.chat.progressLogFinish(operationId, overallSuccess) return ActionResult( - success=True, + success=overallSuccess, documents=[ ActionDocument( documentName=self._generateMeaningfulFileName("sharepoint_upload", "json", None, "uploadDocument"), @@ -260,7 +261,7 @@ async def uploadDocument(self, parameters: Dict[str, Any]) -> ActionResult: if operationId: try: self.services.chat.progressLogFinish(operationId, False) - except: + except Exception: pass return ActionResult( success=False, diff --git a/modules/workflows/methods/methodSharepoint/helpers/apiClient.py b/modules/workflows/methods/methodSharepoint/helpers/apiClient.py index 542e6dde..5b02aaab 100644 --- a/modules/workflows/methods/methodSharepoint/helpers/apiClient.py +++ b/modules/workflows/methods/methodSharepoint/helpers/apiClient.py @@ -17,14 +17,20 @@ class ApiClientHelper: """Helper for Microsoft Graph API calls""" def __init__(self, methodInstance): - """ - Initialize API client helper. - - Args: - methodInstance: Instance of MethodSharepoint (for access to services) - """ self.method = methodInstance self.services = methodInstance.services + self._session: aiohttp.ClientSession = None + + async def _getSession(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + timeout = aiohttp.ClientTimeout(total=30) + self._session = aiohttp.ClientSession(timeout=timeout) + return self._session + + async def close(self): + if self._session and not self._session.closed: + await self._session.close() + self._session = None async def makeGraphApiCall(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]: """ @@ -50,60 +56,28 @@ class ApiClientHelper: url = f"https://graph.microsoft.com/v1.0/{endpoint}" logger.info(f"Making Graph API call: {method} {url}") - # Set timeout to 30 seconds - timeout = aiohttp.ClientTimeout(total=30) + session = await self._getSession() - async with aiohttp.ClientSession(timeout=timeout) as session: - if method == "GET": - logger.debug(f"Starting GET request to {url}") - async with session.get(url, headers=headers) as response: - logger.info(f"Graph API response: {response.status}") - if response.status == 200: - result = await response.json() - logger.debug(f"Graph API success: {len(str(result))} characters response") - return result - else: - errorText = await response.text() - logger.error(f"Graph API call failed: {response.status} - {errorText}") - return {"error": f"API call failed: {response.status} - {errorText}"} - - elif method == "PUT": - logger.debug(f"Starting PUT request to {url}") - async with session.put(url, headers=headers, data=data) as response: - logger.info(f"Graph API response: {response.status}") - if response.status in [200, 201]: - result = await response.json() - logger.debug(f"Graph API success: {len(str(result))} characters response") - return result - else: - errorText = await response.text() - logger.error(f"Graph API call failed: {response.status} - {errorText}") - return {"error": f"API call failed: {response.status} - {errorText}"} - - elif method == "POST": - logger.debug(f"Starting POST request to {url}") - async with session.post(url, headers=headers, data=data) as response: - logger.info(f"Graph API response: {response.status}") - if response.status in [200, 201]: - result = await response.json() - logger.debug(f"Graph API success: {len(str(result))} characters response") - return result - else: - errorText = await response.text() - logger.error(f"Graph API call failed: {response.status} - {errorText}") - return {"error": f"API call failed: {response.status} - {errorText}"} - - elif method == "DELETE": - logger.debug(f"Starting DELETE request to {url}") - async with session.delete(url, headers=headers) as response: - logger.info(f"Graph API response: {response.status}") - if response.status in [200, 204]: - logger.debug(f"Graph API DELETE success") - return {"success": True} - else: - errorText = await response.text() - logger.error(f"Graph API call failed: {response.status} - {errorText}") - return {"error": f"API call failed: {response.status} - {errorText}"} + successCodes = {"GET": [200], "PUT": [200, 201], "POST": [200, 201], "DELETE": [200, 204]} + httpMethod = getattr(session, method.lower(), None) + if not httpMethod: + return {"error": f"Unsupported HTTP method: {method}"} + + kwargs = {"headers": headers} + if data is not None: + kwargs["data"] = data + + async with httpMethod(url, **kwargs) as response: + logger.info(f"Graph API response: {response.status}") + if response.status in successCodes.get(method, [200]): + if method == "DELETE": + return {"success": True} + result = await response.json() + return result + else: + errorText = await response.text() + logger.error(f"Graph API call failed: {response.status} - {errorText}") + return {"error": f"API call failed: {response.status} - {errorText}"} except asyncio.TimeoutError: logger.error(f"Graph API call timed out after 30 seconds: {endpoint}") diff --git a/modules/workflows/processing/adaptive/adaptiveLearningEngine.py b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py index 7efdaef0..18588cf2 100644 --- a/modules/workflows/processing/adaptive/adaptiveLearningEngine.py +++ b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py @@ -14,11 +14,19 @@ class AdaptiveLearningEngine: """Enhanced learning engine that tracks validation patterns and adapts prompts""" def __init__(self): - self.validationHistory = [] # Store validation results with context - self.failurePatterns = defaultdict(list) # Track failure patterns by action type - self.successPatterns = defaultdict(list) # Track success patterns - self.actionAttempts = defaultdict(int) # Track attempt counts per action - self.learningInsights = {} # Store learned insights per workflow + self.validationHistory = [] + self.failurePatterns = defaultdict(list) + self.successPatterns = defaultdict(list) + self.actionAttempts = defaultdict(int) + self.learningInsights = {} + + def reset(self): + """Reset all learned state for a new workflow session.""" + self.validationHistory.clear() + self.failurePatterns.clear() + self.successPatterns.clear() + self.actionAttempts.clear() + self.learningInsights.clear() def recordValidationResult(self, validationResult: Dict[str, Any], actionContext: Dict[str, Any], workflowId: str, attemptNumber: int): @@ -195,15 +203,6 @@ class AdaptiveLearningEngine: for issue, count in list(commonIssues.items())[:3]: # Top 3 issues guidance_parts.append(f"- {issue} (occurred {count} times)") - # Add specific action guidance based on user prompt - if "email" in userPrompt.lower() and "outlook" in userPrompt.lower(): - if any("account" in str(issue).lower() for issue in commonIssues.keys()): - guidance_parts.append("SPECIFIC GUIDANCE: Ensure email is sent from the correct account (valueon).") - if any("attachment" in str(issue).lower() for issue in commonIssues.keys()): - guidance_parts.append("SPECIFIC GUIDANCE: Verify PDF attachment is properly included.") - if any("summary" in str(issue).lower() for issue in commonIssues.keys()): - guidance_parts.append("SPECIFIC GUIDANCE: Include German summary in email body.") - return "\n".join(guidance_parts) if guidance_parts else "No specific guidance available." def _generateParameterGuidance(self, actionName: str, parametersContext: str, @@ -219,12 +218,11 @@ class AdaptiveLearningEngine: if attemptNumber and attemptNumber >= 3: guidanceParts.append(f"Attempt #{attemptNumber}: Adjust parameters based on validation feedback.") - # Generic issues summary commonIssues = failureAnalysis.get('commonIssues', {}) or {} if commonIssues: guidanceParts.append("Address the following parameter issues:") - for issueKey, issueDesc in commonIssues.items(): - guidanceParts.append(f"- {issueKey}: {issueDesc}") + for issueText, count in commonIssues.items(): + guidanceParts.append(f"- {issueText} (occurred {count} time{'s' if count != 1 else ''})") # Keep guidance format stable return "\n".join(guidanceParts) if guidanceParts else "Use standard parameter values." diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py index fe17572f..e8ba106b 100644 --- a/modules/workflows/processing/adaptive/contentValidator.py +++ b/modules/workflows/processing/adaptive/contentValidator.py @@ -273,16 +273,15 @@ class ContentValidator: elif section.get("content_type") in ["paragraph", "heading"]: if elements and isinstance(elements, list) and len(elements) > 0: textElement = elements[0] - # Ensure textElement is a dictionary before accessing if isinstance(textElement, dict): content = textElement.get("content", {}) - if isinstance(content, dict): - text = content.get("text", "") - else: - text = textElement.get("text", "") - if text: - sectionSummary["textLength"] = len(text) - sectionSummary["wordCount"] = len(text.split()) + if isinstance(content, dict): + text = content.get("text", "") + else: + text = textElement.get("text", "") + if text: + sectionSummary["textLength"] = len(text) + sectionSummary["wordCount"] = len(text.split()) if section.get("textLength"): sectionSummary["textLength"] = section.get("textLength") @@ -290,59 +289,47 @@ class ContentValidator: elif section.get("content_type") == "code_block": if elements and isinstance(elements, list) and len(elements) > 0: codeElement = elements[0] - content = codeElement.get("content", {}) - if isinstance(content, dict): - code = content.get("code", "") - language = content.get("language", "") - if code: - sectionSummary["codeLength"] = len(code) - sectionSummary["codeLineCount"] = code.count('\n') + 1 - if language: - sectionSummary["language"] = language + if isinstance(codeElement, dict): + content = codeElement.get("content", {}) + if isinstance(content, dict): + code = content.get("code", "") + language = content.get("language", "") + if code: + sectionSummary["codeLength"] = len(code) + sectionSummary["codeLineCount"] = code.count('\n') + 1 + if language: + sectionSummary["language"] = language - # Wenn contentPartIds vorhanden sind, aber keine elements: Füge ContentParts-Metadaten hinzu contentPartIds = section.get("contentPartIds", []) if contentPartIds and not elements: - # Prüfe ob contentPartsMetadata vorhanden ist contentPartsMetadata = section.get("contentPartsMetadata", []) if contentPartsMetadata: sectionSummary["contentPartsMetadata"] = contentPartsMetadata else: - # Fallback: Zeige nur IDs wenn Metadaten nicht verfügbar sectionSummary["contentPartIds"] = contentPartIds sectionSummary["note"] = "ContentParts referenced but metadata not available" - # Include any additional fields from section (generic approach) - # BUT exclude type-specific KPIs that don't belong to this content_type - # AND exclude internal planning fields that confuse validation contentType = section.get("content_type", "") - # Define KPIs that are ONLY valid for specific types typeExclusiveKpis = { - "table": ["columnCount", "rowCount", "headers"], # Only for tables - "bullet_list": ["itemCount"], # Only for bullet_list - "list": ["itemCount"] # Only for list + "table": ["columnCount", "rowCount", "headers"], + "bullet_list": ["itemCount"], + "list": ["itemCount"] } excludedKpis = [] for kpiType, kpiFields in typeExclusiveKpis.items(): if kpiType != contentType: excludedKpis.extend(kpiFields) - # Internal planning fields that should NOT be shown to validation AI - # These are implementation details, not content indicators internalFields = ["generationHint", "useAiCall", "elements"] for key, value in section.items(): if key not in sectionSummary and key not in internalFields and key not in excludedKpis: - # Don't copy type-specific KPIs if they're 0/empty and we didn't extract them ourselves - # This prevents copying columnCount: 0, rowCount: 0, headers: [] from structure generation phase if key in ["columnCount", "rowCount", "headers", "itemCount"]: - # Skip if it's 0/empty - we'll only include KPIs we extracted from elements if isinstance(value, int) and value == 0: continue if isinstance(value, list) and len(value) == 0: continue - # Include simple types (str, int, float, bool, list of primitives) if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10): sectionSummary[key] = value @@ -486,7 +473,7 @@ class ContentValidator: try: json_str = json.dumps(data) size_bytes = len(json_str.encode('utf-8')) - except: + except (TypeError, ValueError): size_bytes = len(str(data).encode('utf-8')) else: size_bytes = len(str(data).encode('utf-8')) diff --git a/modules/workflows/processing/adaptive/learningEngine.py b/modules/workflows/processing/adaptive/learningEngine.py index 83cf7b13..8fb2f958 100644 --- a/modules/workflows/processing/adaptive/learningEngine.py +++ b/modules/workflows/processing/adaptive/learningEngine.py @@ -16,6 +16,11 @@ class LearningEngine: self.strategies = {} self.feedbackHistory = [] + def reset(self): + """Reset all learned state for a new workflow session.""" + self.strategies.clear() + self.feedbackHistory.clear() + def learnFromFeedback(self, feedback: Dict[str, Any], context: Any, taskIntent: Dict[str, Any]): """Learns from feedback and updates strategies - works on TASK level, not workflow level""" try: diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 0e4d6ee4..6b1e3544 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -136,6 +136,7 @@ class ActionExecutor: # Execute action and track success for progress log result = None actionSuccess = False + actionError = None try: result = await self.executeAction( methodName=action.execMethod, @@ -144,23 +145,23 @@ class ActionExecutor: ) actionSuccess = result.success if result else False except Exception as e: - logger.error(f"Error executing action: {str(e)}") + logger.error(f"Error executing action {action.execMethod}.{action.execAction}: {str(e)}") actionSuccess = False + actionError = str(e) finally: - # Finish action progress tracking try: self.services.chat.progressLogFinish(actionOperationId, actionSuccess) except Exception as e: logger.error(f"Error finishing action progress log: {str(e)}") - # If action execution failed, return error result if result is None: - action.setError("Action execution failed") + errorMsg = actionError or "Action execution failed" + action.setError(errorMsg) return ActionResult( success=False, documents=[], resultLabel=action.execResultLabel, - error="Action execution failed" + error=errorMsg ) resultLabel = action.execResultLabel diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index a4ae05e9..48df832d 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -319,56 +319,27 @@ class MessageCreator: except Exception as e: logger.error(f"Error creating error message: {str(e)}") - def _extractRoundNumberFromLabel(self, label: str) -> int: - """Extract round number from a document label like 'round1_task1_action1_diagram_analysis'""" + def _extractNumberFromLabelPart(self, label: str, prefix: str) -> int: + """Extract number following a prefix in a label like 'round1_task1_action1_context'. + Works for prefix='round', 'task', 'action'. Returns 0 on failure. + """ try: if not label or not isinstance(label, str): return 0 - # Parse label format: round{round}_task{task}_action{action}_{context} - if label.startswith('round'): - roundPart = label.split('_')[0] # Get 'round1' part - if roundPart.startswith('round'): - roundNumber = roundPart[5:] # Remove 'round' prefix - return int(roundNumber) - - return 0 + import re + pattern = rf'{prefix}(\d+)' + match = re.search(pattern, label) + return int(match.group(1)) if match else 0 except Exception as e: - logger.warning(f"Could not extract round number from label '{label}': {str(e)}") + logger.warning(f"Could not extract {prefix} number from label '{label}': {str(e)}") return 0 + def _extractRoundNumberFromLabel(self, label: str) -> int: + return self._extractNumberFromLabelPart(label, 'round') + def _extractTaskNumberFromLabel(self, label: str) -> int: - """Extract task number from a document label like 'round1_task1_action1_diagram_analysis'""" - try: - if not label or not isinstance(label, str): - return 0 - - # Parse label format: round{round}_task{task}_action{action}_{context} - if '_task' in label: - taskPart = label.split('_task')[1] - if taskPart and '_' in taskPart: - taskNumber = taskPart.split('_')[0] - return int(taskNumber) - - return 0 - except Exception as e: - logger.warning(f"Could not extract task number from label '{label}': {str(e)}") - return 0 + return self._extractNumberFromLabelPart(label, 'task') def _extractActionNumberFromLabel(self, label: str) -> int: - """Extract action number from a document label like 'round1_task1_action1_diagram_analysis'""" - try: - if not label or not isinstance(label, str): - return 0 - - # Parse label format: round{round}_task{task}_action{action}_{context} - if '_action' in label: - actionPart = label.split('_action')[1] - if actionPart and '_' in actionPart: - actionNumber = actionPart.split('_')[0] - return int(actionNumber) - - return 0 - except Exception as e: - logger.warning(f"Could not extract action number from label '{label}': {str(e)}") - return 0 + return self._extractNumberFromLabelPart(label, 'action') diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index b1e1def7..233488fe 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -7,7 +7,6 @@ import json import logging from typing import Dict, Any from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, WorkflowModeEnum -from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, ProcessingModeEnum, PriorityEnum from modules.workflows.processing.shared.promptGenerationTaskplan import ( generateTaskPlanningPrompt ) @@ -107,17 +106,6 @@ class TaskPlanner: taskPlanningPromptTemplate = bundle.prompt placeholders = bundle.placeholders - # Centralized AI call: Task planning (quality, detailed) with placeholders - options = AiCallOptions( - operationType=OperationTypeEnum.PLAN, - priority=PriorityEnum.QUALITY, - compressPrompt=False, - compressContext=False, - processingMode=ProcessingModeEnum.DETAILED, - maxCost=0.10, - maxProcessingTime=30 - ) - prompt = await self.services.ai.callAiPlanning( prompt=taskPlanningPromptTemplate, placeholders=placeholders, @@ -141,9 +129,11 @@ class TaskPlanner: raise ValueError("Task plan missing 'tasks' field") except Exception as e: logger.error(f"Error parsing task plan response: {str(e)}") - taskPlanDict = {'tasks': []} + raise ValueError(f"Failed to parse AI task plan response: {str(e)}") from e - if not self._validateTaskPlan(taskPlanDict): + from modules.workflows.processing.core.validator import WorkflowValidator + validator = WorkflowValidator(self.services) + if not validator.validateTask(taskPlanDict): logger.error("Generated task plan failed validation") logger.error(f"AI Response: {prompt}") logger.error(f"Parsed Task Plan: {json.dumps(taskPlanDict, indent=2)}") @@ -207,61 +197,4 @@ class TaskPlanner: logger.error(f"Error in generateTaskPlan: {str(e)}") raise - - - def _validateTaskPlan(self, taskPlan: Dict[str, Any]) -> bool: - """Validate task plan structure""" - try: - if not isinstance(taskPlan, dict): - logger.error("Task plan is not a dictionary") - return False - - if 'tasks' not in taskPlan or not isinstance(taskPlan['tasks'], list): - logger.error(f"Task plan missing 'tasks' field or not a list. Found: {type(taskPlan.get('tasks', 'MISSING'))}") - return False - - # First pass: collect all task IDs to validate dependencies - taskIds = set() - for task in taskPlan['tasks']: - if not isinstance(task, dict): - logger.error(f"Task is not a dictionary: {type(task)}") - return False - if 'id' not in task: - logger.error(f"Task missing 'id' field: {task}") - return False - taskIds.add(task['id']) - - # Second pass: validate each task - for i, task in enumerate(taskPlan['tasks']): - if not isinstance(task, dict): - logger.error(f"Task {i} is not a dictionary: {type(task)}") - return False - - requiredFields = ['id', 'objective', 'successCriteria'] - missingFields = [field for field in requiredFields if field not in task] - if missingFields: - logger.error(f"Task {i} missing required fields: {missingFields}") - return False - - # Check for duplicate IDs (shouldn't happen after first pass, but safety check) - if task['id'] in taskIds and list(taskPlan['tasks']).count(task['id']) > 1: - logger.error(f"Task {i} has duplicate ID: {task['id']}") - return False - - dependencies = task.get('dependencies', []) - if not isinstance(dependencies, list): - logger.error(f"Task {i} dependencies is not a list: {type(dependencies)}") - return False - - for dep in dependencies: - if dep not in taskIds and dep != 'task_0': - logger.error(f"Task {i} has invalid dependency: {dep} (available: {list(taskIds) + ['task_0']})") - return False - - logger.info(f"Task plan validation successful with {len(taskIds)} tasks") - return True - - except Exception as e: - logger.error(f"Error validating task plan: {str(e)}") - return False \ No newline at end of file diff --git a/modules/workflows/processing/core/validator.py b/modules/workflows/processing/core/validator.py index 74d67b19..67c685e8 100644 --- a/modules/workflows/processing/core/validator.py +++ b/modules/workflows/processing/core/validator.py @@ -25,40 +25,35 @@ class WorkflowValidator: logger.error(f"Task plan missing 'tasks' field or not a list. Found: {type(taskPlan.get('tasks', 'MISSING'))}") return False - # First pass: collect all task IDs to validate dependencies + # Single pass: collect IDs (detect duplicates) and validate each task taskIds = set() - for task in taskPlan['tasks']: - if not isinstance(task, dict): - logger.error(f"Task is not a dictionary: {type(task)}") - return False - if 'id' not in task: - logger.error(f"Task missing 'id' field: {task}") - return False - taskIds.add(task['id']) - - # Second pass: validate each task for i, task in enumerate(taskPlan['tasks']): if not isinstance(task, dict): logger.error(f"Task {i} is not a dictionary: {type(task)}") return False + if 'id' not in task: + logger.error(f"Task {i} missing 'id' field: {task}") + return False + + if task['id'] in taskIds: + logger.error(f"Task {i} has duplicate ID: {task['id']}") + return False + taskIds.add(task['id']) requiredFields = ['id', 'objective', 'successCriteria'] missingFields = [field for field in requiredFields if field not in task] if missingFields: logger.error(f"Task {i} missing required fields: {missingFields}") return False - - # Check for duplicate IDs (shouldn't happen after first pass, but safety check) - if task['id'] in taskIds and list(taskPlan['tasks']).count(task['id']) > 1: - logger.error(f"Task {i} has duplicate ID: {task['id']}") - return False dependencies = task.get('dependencies', []) if not isinstance(dependencies, list): logger.error(f"Task {i} dependencies is not a list: {type(dependencies)}") return False - - for dep in dependencies: + + # Second pass: validate dependencies (all IDs now known) + for i, task in enumerate(taskPlan['tasks']): + for dep in task.get('dependencies', []): if dep not in taskIds and dep != 'task_0': logger.error(f"Task {i} has invalid dependency: {dep} (available: {list(taskIds) + ['task_0']})") return False @@ -93,7 +88,7 @@ class WorkflowValidator: missingFields = [] for field in requiredFields: - if field not in action or not action[field]: + if field not in action or action[field] is None: missingFields.append(field) if missingFields: logger.error(f"Action {i} missing required fields: {missingFields}") diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py index 4e3c7853..1d0121b9 100644 --- a/modules/workflows/processing/modes/modeAutomation.py +++ b/modules/workflows/processing/modes/modeAutomation.py @@ -36,6 +36,9 @@ class AutomationMode(BaseMode): - Or as direct JSON in userInput """ try: + # Reset action map to prevent state leaks from previous runs + self.taskActionMap = {} + # AUTOMATION mode ALWAYS requires a JSON plan to be provided in userInput # Try to extract plan from userInput (embedded JSON or direct JSON) templatePlan = None @@ -340,78 +343,6 @@ class AutomationMode(BaseMode): error=str(e) ) - def _createActionItem(self, actionData: Dict[str, Any]) -> Optional[ActionItem]: - """Create ActionItem from action data""" - try: - import uuid - from datetime import datetime, timezone - - # Ensure ID is present - if "id" not in actionData or not actionData["id"]: - actionData["id"] = f"action_{uuid.uuid4()}" - - # Ensure required fields - if "status" not in actionData: - actionData["status"] = TaskStatus.PENDING - - if "execMethod" not in actionData: - logger.error("execMethod is required for task action") - return None - - if "execAction" not in actionData: - logger.error("execAction is required for task action") - return None - - if "execParameters" not in actionData: - actionData["execParameters"] = {} - - # Use generic field separation based on ActionItem model - simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData) - - # Create action in database - createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields) - - # Convert to ActionItem model - return ActionItem( - id=createdAction["id"], - execMethod=createdAction["execMethod"], - execAction=createdAction["execAction"], - execParameters=createdAction.get("execParameters", {}), - execResultLabel=createdAction.get("execResultLabel"), - expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), - status=createdAction.get("status", TaskStatus.PENDING), - error=createdAction.get("error"), - retryCount=createdAction.get("retryCount", 0), - retryMax=createdAction.get("retryMax", 3), - processingTime=createdAction.get("processingTime"), - timestamp=parseTimestamp(createdAction.get("timestamp"), default=self.services.utils.timestampGetUtc()), - result=createdAction.get("result"), - userMessage=createdAction.get("userMessage") - ) - - except Exception as e: - logger.error(f"Error creating task action: {str(e)}") - return None - - def _updateWorkflowBeforeExecutingTask(self, taskNumber: int): - """Update workflow object before executing a task""" - try: - workflow = self.services.workflow - updateData = { - "currentTask": taskNumber, - "currentAction": 0, - "totalActions": 0 - } - - workflow.currentTask = taskNumber - workflow.currentAction = 0 - workflow.totalActions = 0 - - self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) - logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}") - except Exception as e: - logger.error(f"Error updating workflow before executing task: {str(e)}") - def _updateWorkflowAfterActionPlanning(self, totalActions: int): """Update workflow object after action planning""" try: @@ -423,17 +354,6 @@ class AutomationMode(BaseMode): except Exception as e: logger.error(f"Error updating workflow after action planning: {str(e)}") - def _updateWorkflowBeforeExecutingAction(self, actionNumber: int): - """Update workflow object before executing an action""" - try: - workflow = self.services.workflow - updateData = {"currentAction": actionNumber} - workflow.currentAction = actionNumber - self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) - logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}") - except Exception as e: - logger.error(f"Error updating workflow before executing action: {str(e)}") - def _setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None): """Set total counts for workflow progress tracking""" try: diff --git a/modules/workflows/processing/modes/modeBase.py b/modules/workflows/processing/modes/modeBase.py index 770c868a..fe9a5da6 100644 --- a/modules/workflows/processing/modes/modeBase.py +++ b/modules/workflows/processing/modes/modeBase.py @@ -4,14 +4,16 @@ # Abstract base class for workflow modes from abc import ABC, abstractmethod +import uuid import logging -from typing import List, Dict, Any -from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskResult, ActionItem +from typing import List, Dict, Any, Optional +from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus from modules.datamodels.datamodelChat import ChatWorkflow from modules.workflows.processing.core.taskPlanner import TaskPlanner from modules.workflows.processing.core.actionExecutor import ActionExecutor from modules.workflows.processing.core.messageCreator import MessageCreator from modules.workflows.processing.core.validator import WorkflowValidator +from modules.shared.timeUtils import parseTimestamp logger = logging.getLogger(__name__) @@ -44,3 +46,75 @@ class BaseMode(ABC): async def createTaskPlanMessage(self, taskPlan, workflow: ChatWorkflow): """Create task plan message - common to all modes""" return await self.messageCreator.createTaskPlanMessage(taskPlan, workflow) + + def _createActionItem(self, actionData: Dict[str, Any]) -> Optional[ActionItem]: + """Create an ActionItem from action data, persist to DB, and return the model instance""" + try: + if "id" not in actionData or not actionData["id"]: + actionData["id"] = f"action_{uuid.uuid4()}" + + if "status" not in actionData: + actionData["status"] = TaskStatus.PENDING + + if "execMethod" not in actionData: + logger.error("execMethod is required for task action") + return None + + if "execAction" not in actionData: + logger.error("execAction is required for task action") + return None + + if "execParameters" not in actionData: + actionData["execParameters"] = {} + + simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData) + createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields) + + return ActionItem( + id=createdAction["id"], + execMethod=createdAction["execMethod"], + execAction=createdAction["execAction"], + execParameters=createdAction.get("execParameters", {}), + execResultLabel=createdAction.get("execResultLabel"), + expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), + status=createdAction.get("status", TaskStatus.PENDING), + error=createdAction.get("error"), + retryCount=createdAction.get("retryCount", 0), + retryMax=createdAction.get("retryMax", 3), + processingTime=createdAction.get("processingTime"), + timestamp=parseTimestamp(createdAction.get("timestamp"), default=self.services.utils.timestampGetUtc()), + result=createdAction.get("result"), + userMessage=createdAction.get("userMessage") + ) + + except Exception as e: + logger.error(f"Error creating task action: {str(e)}") + return None + + def _updateWorkflowBeforeExecutingTask(self, taskNumber: int): + """Update workflow state before executing a task""" + try: + workflow = self.services.workflow + updateData = { + "currentTask": taskNumber, + "currentAction": 0, + "totalActions": 0 + } + workflow.currentTask = taskNumber + workflow.currentAction = 0 + workflow.totalActions = 0 + self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) + logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}") + except Exception as e: + logger.error(f"Error updating workflow before executing task: {str(e)}") + + def _updateWorkflowBeforeExecutingAction(self, actionNumber: int): + """Update workflow state before executing an action""" + try: + workflow = self.services.workflow + updateData = {"currentAction": actionNumber} + workflow.currentAction = actionNumber + self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) + logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}") + except Exception as e: + logger.error(f"Error updating workflow before executing action: {str(e)}") diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py index e59a9253..eeae30e5 100644 --- a/modules/workflows/processing/modes/modeDynamic.py +++ b/modules/workflows/processing/modes/modeDynamic.py @@ -116,6 +116,7 @@ class DynamicMode(BaseMode): step = 1 decision = None + lastStepFailed = False while step <= state.max_steps: checkWorkflowStopped(self.services) @@ -282,6 +283,7 @@ class DynamicMode(BaseMode): except Exception as e: logger.error(f"Dynamic step {step} error: {e}") + lastStepFailed = True break # NEW: Use adaptive stopping logic @@ -296,19 +298,24 @@ class DynamicMode(BaseMode): step += 1 # Summarize task result for dynamic mode - status = TaskStatus.COMPLETED - success = True - # Get feedback from last decision if available lastDecision = context.previousReviewResult[-1] if hasattr(context, 'previousReviewResult') and context.previousReviewResult else None feedback = lastDecision.reason if lastDecision and isinstance(lastDecision, ReviewResult) else 'Completed' - if lastDecision and isinstance(lastDecision, ReviewResult) and lastDecision.status == 'success': + + if lastStepFailed: + status = TaskStatus.FAILED + success = False + elif lastDecision and isinstance(lastDecision, ReviewResult) and lastDecision.status in ('stop', 'failed'): + status = TaskStatus.FAILED + success = False + else: + status = TaskStatus.COMPLETED success = True # Create proper ReviewResult for completion message completionReviewResult = ReviewResult( - status='success', + status='success' if success else 'failed', reason=feedback, - qualityScore=lastDecision.qualityScore if lastDecision and isinstance(lastDecision, ReviewResult) else 8.0, + qualityScore=lastDecision.qualityScore if lastDecision and isinstance(lastDecision, ReviewResult) else (8.0 if success else 2.0), metCriteria=[], improvements=[] ) @@ -1003,12 +1010,15 @@ class DynamicMode(BaseMode): # Detect repeated actions actionCounts = {} for entry in actionHistory: - # Extract action name (after first space, before next space or {) - parts = entry.split() - if len(parts) > 1: - # Skip "Step", "Refinement" prefixes and get the action name - actionName = parts[1] if parts[0] in ['Step', 'Refinement'] else parts[0] - actionCounts[actionName] = actionCounts.get(actionName, 0) + 1 + # Format: "Step N: actionName ..." or "Refinement N: actionName ..." + # Extract the action name after "prefix N:" + colonIdx = entry.find(':') + if colonIdx >= 0: + afterColon = entry[colonIdx + 1:].strip().split() + actionName = afterColon[0] if afterColon else 'unknown' + else: + actionName = entry.split()[0] if entry.split() else 'unknown' + actionCounts[actionName] = actionCounts.get(actionName, 0) + 1 repeatedActions = [action for action, count in actionCounts.items() if count >= 2] if repeatedActions: @@ -1172,150 +1182,6 @@ Return only the user-friendly message, no technical details.""" logger.error(f"Error generating action result message: {str(e)}") return f"{method}.{actionName} action completed" - def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem: - """Creates a new task action for Dynamic mode""" - try: - import uuid - - # Ensure ID is present - if "id" not in actionData or not actionData["id"]: - actionData["id"] = f"action_{uuid.uuid4()}" - - # Ensure required fields - if "status" not in actionData: - actionData["status"] = TaskStatus.PENDING - - if "execMethod" not in actionData: - logger.error("execMethod is required for task action") - return None - - if "execAction" not in actionData: - logger.error("execAction is required for task action") - return None - - if "execParameters" not in actionData: - actionData["execParameters"] = {} - - # Use generic field separation based on ActionItem model - simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData) - - # Create action in database - createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields) - - # Convert to ActionItem model - return ActionItem( - id=createdAction["id"], - execMethod=createdAction["execMethod"], - execAction=createdAction["execAction"], - execParameters=createdAction.get("execParameters", {}), - execResultLabel=createdAction.get("execResultLabel"), - expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), - status=createdAction.get("status", TaskStatus.PENDING), - error=createdAction.get("error"), - retryCount=createdAction.get("retryCount", 0), - retryMax=createdAction.get("retryMax", 3), - processingTime=createdAction.get("processingTime"), - timestamp=parseTimestamp(createdAction.get("timestamp"), default=self.services.utils.timestampGetUtc()), - result=createdAction.get("result"), - resultDocuments=createdAction.get("resultDocuments", []), - userMessage=createdAction.get("userMessage") - ) - - except Exception as e: - logger.error(f"Error creating task action: {str(e)}") - return None - def _updateWorkflowBeforeExecutingTask(self, taskNumber: int): - """Update workflow object before executing a task""" - try: - workflow = self.services.workflow - updateData = { - "currentTask": taskNumber, - "currentAction": 0, - "totalActions": 0 - } - - # Update workflow object - workflow.currentTask = taskNumber - workflow.currentAction = 0 - workflow.totalActions = 0 - - # Update in database - self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) - logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}: {updateData}") - - except Exception as e: - logger.error(f"Error updating workflow before executing task: {str(e)}") - def _updateWorkflowBeforeExecutingAction(self, actionNumber: int): - """Update workflow object before executing an action""" - try: - workflow = self.services.workflow - updateData = { - "currentAction": actionNumber - } - - # Update workflow object - workflow.currentAction = actionNumber - - # Update in database - self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData) - logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}: {updateData}") - - except Exception as e: - logger.error(f"Error updating workflow before executing action: {str(e)}") - - def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem: - """Creates a new task action for Dynamic mode""" - try: - import uuid - - # Ensure ID is present - if "id" not in actionData or not actionData["id"]: - actionData["id"] = f"action_{uuid.uuid4()}" - - # Ensure required fields - if "status" not in actionData: - actionData["status"] = TaskStatus.PENDING - - if "execMethod" not in actionData: - logger.error("execMethod is required for task action") - return None - - if "execAction" not in actionData: - logger.error("execAction is required for task action") - return None - - if "execParameters" not in actionData: - actionData["execParameters"] = {} - - # Use generic field separation based on ActionItem model - simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData) - - # Create action in database - createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields) - - # Convert to ActionItem model - return ActionItem( - id=createdAction["id"], - execMethod=createdAction["execMethod"], - execAction=createdAction["execAction"], - execParameters=createdAction.get("execParameters", {}), - execResultLabel=createdAction.get("execResultLabel"), - expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), - status=createdAction.get("status", TaskStatus.PENDING), - error=createdAction.get("error"), - retryCount=createdAction.get("retryCount", 0), - retryMax=createdAction.get("retryMax", 3), - processingTime=createdAction.get("processingTime"), - timestamp=parseTimestamp(createdAction.get("timestamp"), default=self.services.utils.timestampGetUtc()), - result=createdAction.get("result"), - resultDocuments=createdAction.get("resultDocuments", []), - userMessage=createdAction.get("userMessage") - ) - - except Exception as e: - logger.error(f"Error creating task action: {str(e)}") - return None - diff --git a/modules/workflows/processing/shared/executionState.py b/modules/workflows/processing/shared/executionState.py index 1cdf0d53..e5e48a01 100644 --- a/modules/workflows/processing/shared/executionState.py +++ b/modules/workflows/processing/shared/executionState.py @@ -5,23 +5,22 @@ import logging from typing import List, Optional -from modules.datamodels.datamodelChat import TaskStep, ActionResult, Observation +from modules.datamodels.datamodelChat import TaskStep, ActionResult logger = logging.getLogger(__name__) class TaskExecutionState: """Manages execution state for a task with retry logic""" - def __init__(self, task_step: TaskStep): - self.task_step = task_step - self.successful_actions: List[ActionResult] = [] # Preserved across retries - self.failed_actions: List[ActionResult] = [] # For analysis + def __init__(self, taskStep: TaskStep): + self.task_step = taskStep + self.successful_actions: List[ActionResult] = [] + self.failed_actions: List[ActionResult] = [] self.current_action_index = 0 self.retry_count = 0 self.max_retries = 3 - # Iterative loop (dynamic mode) self.current_step = 0 - self.max_steps = 0 # Will be overridden by workflow.maxSteps from workflowManager.py + self.max_steps = 0 def addSuccessfulAction(self, action_result: ActionResult): """Add a successful action to the state""" @@ -58,48 +57,25 @@ class TaskExecutionState: patterns.append("permission_issues") return list(set(patterns)) -def shouldContinue(observation: Optional[Observation], review=None, current_step: int = 0, max_steps: int = 1) -> bool: - """Helper to decide if the iterative loop should continue +def shouldContinue(observation=None, review=None, current_step: int = 0, max_steps: int = 1) -> bool: + """Helper to decide if the iterative loop should continue. - Args: - observation: Observation Pydantic model with action execution results - review: ReviewResult or dict with review decision (optional) - current_step: Current step number in the iteration - max_steps: Maximum allowed steps - - Returns: - bool: True if loop should continue, False if should stop - - Logic: - - Stop if max steps reached - - Stop if review indicates 'stop' or success criteria are met - - Continue if observation indicates failure but allow one more step (caller caps by max_steps) + Returns False if max steps reached or review indicates 'stop'/'success'. """ try: - # Stop if max steps reached if current_step >= max_steps: logger.info(f"Stopping workflow: reached max_steps limit ({current_step} >= {max_steps})") return False - # Check review decision (can be ReviewResult model or dict) if review: if hasattr(review, 'status'): - # ReviewResult Pydantic model if review.status in ('stop', 'success'): return False elif isinstance(review, dict): - # Legacy dict format decision = review.get('decision') or review.get('status') if decision in ('stop', 'success'): return False - # Check observation: if hard failure with no documents, allow one more step - # The caller will enforce max_steps limit - if observation: - if observation.success is False and observation.documentsCount == 0: - # Allow next step once; the caller caps by max_steps - return True - return True except Exception as e: logger.warning(f"Error in shouldContinue: {e}") diff --git a/modules/workflows/processing/shared/methodDiscovery.py b/modules/workflows/processing/shared/methodDiscovery.py index e3bfa769..b8403b3d 100644 --- a/modules/workflows/processing/shared/methodDiscovery.py +++ b/modules/workflows/processing/shared/methodDiscovery.py @@ -19,117 +19,57 @@ methods = {} def discoverMethods(serviceCenter): """Dynamically discover all method classes and their actions in modules methods package. - CRITICAL: If methods are already discovered, updates their Services reference to ensure - they use the current workflow (self.services.workflow). This prevents stale workflow IDs - from being used when a new workflow starts. + Always creates fresh method instances bound to the given serviceCenter, + preventing stale or cross-workflow service references. """ + global methods try: - # Import the methods package methodsPackage = importlib.import_module('modules.workflows.methods') - # Discover all modules and packages in the methods package + # Clear and rebuild to prevent cross-workflow state contamination + methods.clear() + uniqueCount = 0 + for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): if name.startswith('method'): try: - if isPkg: - # Package (folder) - import __init__.py which exports the Method class - module = importlib.import_module(f'modules.workflows.methods.{name}') - else: - # Module (file) - import directly - module = importlib.import_module(f'modules.workflows.methods.{name}') + module = importlib.import_module(f'modules.workflows.methods.{name}') - # Find all classes in the module that inherit from MethodBase for itemName, item in inspect.getmembers(module): if (inspect.isclass(item) and issubclass(item, MethodBase) and item != MethodBase): - # Check if method already exists in cache shortName = itemName.replace('Method', '').lower() - if itemName in methods or shortName in methods: - # Method already discovered - update Services reference to use current workflow - existingMethodInfo = methods.get(itemName) or methods.get(shortName) - if existingMethodInfo and existingMethodInfo.get('instance'): - existingMethodInfo['instance'].services = serviceCenter - logger.debug(f"Updated Services reference for cached method {itemName} to use current workflow") - else: - # Method exists but instance is missing - recreate it - methodInstance = item(serviceCenter) - actions = methodInstance.actions - methodInfo = { - 'instance': methodInstance, - 'actions': actions, - 'description': item.__doc__ or f"Method {itemName}" - } - methods[itemName] = methodInfo - methods[shortName] = methodInfo - logger.info(f"Recreated method {itemName} (short: {shortName}) with {len(actions)} actions") - else: - # Method not discovered yet - create new instance - methodInstance = item(serviceCenter) - - # Use the actions property from MethodBase which handles WorkflowActionDefinition - actions = methodInstance.actions - - # Create method info - methodInfo = { - 'instance': methodInstance, - 'actions': actions, - 'description': item.__doc__ or f"Method {itemName}" - } - - # Store the method with full class name - methods[itemName] = methodInfo - - # Also store with short name for action executor access - methods[shortName] = methodInfo - - logger.info(f"Discovered method {itemName} (short: {shortName}) with {len(actions)} actions") + + # Skip if already processed (via another module path) + if itemName in methods: + continue + + methodInstance = item(serviceCenter) + actions = methodInstance.actions + + methodInfo = { + 'instance': methodInstance, + 'actions': actions, + 'description': item.__doc__ or f"Method {itemName}" + } + + methods[itemName] = methodInfo + methods[shortName] = methodInfo + uniqueCount += 1 + + logger.info(f"Discovered method {itemName} (short: {shortName}) with {len(actions)} actions") except Exception as e: logger.error(f"Error discovering method {name}: {str(e)}") continue - logger.info(f"Discovered/updated {len(methods)} method entries total") + logger.info(f"Discovered {uniqueCount} unique methods ({len(methods)} entries with aliases)") except Exception as e: logger.error(f"Error discovering methods: {str(e)}") -def getMethodsList(serviceCenter): - """Get a list of available methods with their signatures""" - if not methods: - discoverMethods(serviceCenter) - - methodsList = [] - for methodName, methodInfo in methods.items(): - methodDescription = methodInfo['description'] - actionsList = [] - - for actionName, actionInfo in methodInfo['actions'].items(): - actionDescription = actionInfo['description'] - parameters = actionInfo['parameters'] - - # Build parameter signature - paramSig = [] - for paramName, paramInfo in parameters.items(): - paramType = paramInfo['type'] - paramRequired = paramInfo['required'] - paramDefault = paramInfo['default'] - - if paramRequired: - paramSig.append(f"{paramName}: {paramType}") - else: - defaultStr = f" = {paramDefault}" if paramDefault is not None else " = None" - paramSig.append(f"{paramName}: {paramType}{defaultStr}") - - paramSignature = f"({', '.join(paramSig)})" if paramSig else "()" - actionsList.append(f"- {actionName}{paramSignature}: {actionDescription}") - - actionsStr = "\n".join(actionsList) - methodsList.append(f"**{methodName}**: {methodDescription}\n{actionsStr}") - - return "\n\n".join(methodsList) - def getActionParameterList(methodName: str, actionName: str, methods: Dict[str, Any]) -> str: """Get action parameter list from WorkflowActionParameter structure for AI parameter generation (list only).""" try: diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py index 136dd2cb..3d1a9d83 100644 --- a/modules/workflows/processing/shared/placeholderFactory.py +++ b/modules/workflows/processing/shared/placeholderFactory.py @@ -39,6 +39,26 @@ from typing import Dict, Any, List logger = logging.getLogger(__name__) from modules.workflows.processing.shared.methodDiscovery import (methods, discoverMethods) +from modules.datamodels.datamodelChat import Observation + + +def _observationToDict(obs) -> dict: + """Convert an Observation (Pydantic model or dict) to a plain dict.""" + if isinstance(obs, dict): + return obs.copy() + if hasattr(obs, 'model_dump'): + return obs.model_dump(exclude_none=True) + if hasattr(obs, 'dict'): + return obs.dict() + return {"raw": str(obs)} + + +def _redactSnippets(obsDict: dict): + """Replace large snippet strings with a metadata indicator.""" + if 'previews' in obsDict and isinstance(obsDict['previews'], list): + for preview in obsDict['previews']: + if isinstance(preview, dict) and 'snippet' in preview: + preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]" def extractUserPrompt(context: Any) -> str: """Extract user prompt from context. Maps to {{KEY:USER_PROMPT}}. @@ -71,22 +91,17 @@ def extractUserPrompt(context: Any) -> str: def extractNormalizedRequest(services: Any) -> str: """Extract normalized user request from services. Maps to {{KEY:NORMALIZED_REQUEST}}. Returns the full normalized request from user input analysis (preserves all constraints and details). - CRITICAL: Must return the actual normalizedRequest from analysis, NOT intent. """ try: - # Get normalized request from currentUserPromptNormalized (stores the normalizedRequest from analysis) if services and getattr(services, 'currentUserPromptNormalized', None): normalized = services.currentUserPromptNormalized - # Validate that it's not the intent (which is shorter and less detailed) - # Intent is typically a concise objective, normalized request should be longer and more detailed workflowIntent = getattr(services.workflow, '_workflowIntent', {}) if hasattr(services, 'workflow') and services.workflow else {} intent = workflowIntent.get('intent', '') - # If normalized matches intent exactly, it's wrong - log warning if intent and normalized == intent: logger.warning(f"extractNormalizedRequest: normalized request matches intent - this is incorrect! normalized={normalized[:100]}...") - # Try to get from workflow intent or return error message - return f"ERROR: Normalized request not properly stored. Expected detailed request, got intent: {intent}" + # Fall back to intent rather than injecting an error string into the LLM prompt + return intent return normalized @@ -346,49 +361,12 @@ def extractReviewContent(context: Any) -> str: return result_summary elif hasattr(context, 'observation') and context.observation: - # For observation data, show full content but handle documents specially - # Handle both Pydantic Observation model and dict format - from modules.datamodels.datamodelChat import Observation - - if isinstance(context.observation, Observation): - # Convert Pydantic model to dict - obs_dict = context.observation.model_dump(exclude_none=True) if hasattr(context.observation, 'model_dump') else context.observation.dict() - elif isinstance(context.observation, dict): - obs_dict = context.observation.copy() - else: - # Fallback: try to serialize as-is - obs_dict = context.observation.model_dump(exclude_none=True) if hasattr(context.observation, 'model_dump') else context.observation.dict() - - # If there are previews with documents, show only metadata - if 'previews' in obs_dict and isinstance(obs_dict['previews'], list): - for preview in obs_dict['previews']: - if isinstance(preview, dict) and 'snippet' in preview: - # Replace snippet with metadata indicator - preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]" - + obs_dict = _observationToDict(context.observation) + _redactSnippets(obs_dict) return json.dumps(obs_dict, indent=2, ensure_ascii=False) elif hasattr(context, 'stepResult') and context.stepResult and 'observation' in context.stepResult: - # For observation data in stepResult, show full content but handle documents specially - observation = context.stepResult['observation'] - # Handle both Pydantic Observation model and dict format - from modules.datamodels.datamodelChat import Observation - - if isinstance(observation, Observation): - # Convert Pydantic model to dict - obs_dict = observation.model_dump(exclude_none=True) if hasattr(observation, 'model_dump') else observation.dict() - elif isinstance(observation, dict): - obs_dict = observation.copy() - else: - # Fallback: try to serialize - obs_dict = observation.model_dump(exclude_none=True) if hasattr(observation, 'model_dump') else observation.dict() - - # If there are previews with documents, show only metadata - if 'previews' in obs_dict and isinstance(obs_dict['previews'], list): - for preview in obs_dict['previews']: - if isinstance(preview, dict) and 'snippet' in preview: - # Replace snippet with metadata indicator - preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]" - + obs_dict = _observationToDict(context.stepResult['observation']) + _redactSnippets(obs_dict) return json.dumps(obs_dict, indent=2, ensure_ascii=False) else: return "No review content available" @@ -449,41 +427,22 @@ def extractLatestRefinementFeedback(context: Any) -> str: CRITICAL: If ERROR level logs are found, refinement should stop processing. """ try: - # First check for ERROR level logs in workflow - if hasattr(context, 'workflow') and context.workflow: - try: - import modules.interfaces.interfaceDbChat as interfaceDbChat - from modules.interfaces.interfaceDbApp import getRootInterface - rootInterface = getRootInterface() - interfaceDbChat = interfaceDbChat.getInterface(rootInterface.currentUser) - - # Get workflow logs - chatData = interfaceDbChat.getUnifiedChatData(context.workflow.id, None) - logs = chatData.get("logs", []) - - # Check for ERROR level logs - for log in logs: - if isinstance(log, dict): - log_level = log.get("level", "").upper() - log_message = str(log.get("message", "")) - if log_level == "ERROR" or "ERROR" in log_message.upper(): - return f"CRITICAL: Processing stopped due to ERROR in logs: {log_message[:200]}" - except Exception as log_check_error: - # If we can't check logs, continue with normal feedback extraction - logger.warning(f"Could not check for ERROR logs: {str(log_check_error)}") - if not hasattr(context, 'previousReviewResult') or not context.previousReviewResult or not isinstance(context.previousReviewResult, list): return "No previous refinement feedback available" - # Get the most recent refinement decision + # Get the most recent refinement decision (supports both ReviewResult objects and dicts) latest_decision = context.previousReviewResult[-1] - if not isinstance(latest_decision, dict): + + # Normalize to dict if it's a Pydantic model (e.g. ReviewResult) + if hasattr(latest_decision, 'model_dump'): + latest_decision = latest_decision.model_dump() + elif not isinstance(latest_decision, dict): return "No previous refinement feedback available" feedback_parts = [] - # Add decision and reason - decision = latest_decision.get('decision', 'unknown') + # Add decision and reason (ReviewResult uses 'status', legacy uses 'decision') + decision = latest_decision.get('status') or latest_decision.get('decision', 'unknown') reason = latest_decision.get('reason', 'No reason provided') feedback_parts.append(f"Latest Decision: {decision}") feedback_parts.append(f"Reason: {reason}") diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py index 31878033..dee1cc1f 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py +++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py @@ -46,12 +46,19 @@ def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=No adaptiveContext = learningEngine.getAdaptiveContextForActionSelection(workflowId, userPrompt) if adaptiveContext: - # Add learning-aware placeholders placeholders.extend([ PromptPlaceholder(label="ADAPTIVE_GUIDANCE", content=adaptiveContext.get('adaptiveGuidance', ''), summaryAllowed=True), PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True), PromptPlaceholder(label="ESCALATION_LEVEL", content=adaptiveContext.get('escalationLevel', 'low'), summaryAllowed=False), ]) + + # Always provide these placeholders so template tokens don't leak into the LLM prompt + if not adaptiveContext: + placeholders.extend([ + PromptPlaceholder(label="ADAPTIVE_GUIDANCE", content="", summaryAllowed=True), + PromptPlaceholder(label="FAILURE_ANALYSIS", content="", summaryAllowed=True), + PromptPlaceholder(label="ESCALATION_LEVEL", content="low", summaryAllowed=False), + ]) template = """Select exactly one next action to advance the task incrementally. @@ -60,7 +67,8 @@ CONTEXT: {{KEY:OVERALL_TASK_CONTEXT}} OBJECTIVE: {{KEY:TASK_OBJECTIVE}} === AVAILABLE RESOURCES === -AVAILABLE_DOCUMENTS_INDEX: {{KEY:AVAILABLE_DOCUMENTS_SUMMARY}} +AVAILABLE_DOCUMENTS_SUMMARY: {{KEY:AVAILABLE_DOCUMENTS_SUMMARY}} +AVAILABLE_DOCUMENTS_INDEX: {{KEY:AVAILABLE_DOCUMENTS_INDEX}} AVAILABLE_CONNECTIONS_INDEX: {{KEY:AVAILABLE_CONNECTIONS_INDEX}} @@ -227,6 +235,13 @@ Excludes documents/connections/history entirely. PromptPlaceholder(label="ATTEMPT_NUMBER", content=str(adaptiveContext.get('attemptNumber', 1)), summaryAllowed=False), PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True), ]) + + if not adaptiveContext: + placeholders.extend([ + PromptPlaceholder(label="PARAMETER_GUIDANCE", content="", summaryAllowed=True), + PromptPlaceholder(label="ATTEMPT_NUMBER", content="1", summaryAllowed=False), + PromptPlaceholder(label="FAILURE_ANALYSIS", content="", summaryAllowed=True), + ]) template = """You are a parameter generator. Set the parameters for this specific action. diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 3547008a..72f45cce 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -141,8 +141,9 @@ class WorkflowProcessor: # Delegate to the appropriate mode result = await self.mode.executeTask(taskStep, workflow, context) - # Complete progress tracking - self.services.chat.progressLogFinish(operationId, True) + # Complete progress tracking based on actual result + taskSuccess = result.success if hasattr(result, 'success') else True + self.services.chat.progressLogFinish(operationId, taskSuccess) return result except Exception as e: @@ -329,7 +330,7 @@ class WorkflowProcessor: return handoverData except Exception as e: logger.error(f"Error in prepareTaskHandover: {str(e)}") - return {'error': str(e)} + raise # Fast Path Implementation @@ -379,10 +380,7 @@ class WorkflowProcessor: "################ USER INPUT START #################\n" ) - # Add sanitized user input with clear delimiters - # Escape curly braces for f-string safety, but preserve format (no quote wrapping) - sanitizedPrompt = prompt.replace('{', '{{').replace('}', '}}') if prompt else "" - complexityPrompt += f"{sanitizedPrompt}\n" + complexityPrompt += f"{prompt or ''}\n" complexityPrompt += "################ USER INPUT FINISH #################\n\n" @@ -469,17 +467,14 @@ class WorkflowProcessor: "Format your response as plain text (no markdown code blocks unless showing code examples)." ) - # Prepare AI call options for fast path (balanced, fast processing) - options = AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, - maxCost=0.10, # Low cost for simple requests - maxProcessingTime=15 # Fast path should complete in 15s + maxCost=0.10, + maxProcessingTime=15 ) - # Call AI via callAi() to ensure stats are stored aiRequest = AiCallRequest( prompt=fastPathPrompt, context="", @@ -630,17 +625,23 @@ class WorkflowProcessor: chatDocuments = [] if taskResult.actionResult and taskResult.actionResult.documents: for actionDoc in taskResult.actionResult.documents: - if hasattr(actionDoc, 'documentData') and actionDoc.documentData: - # Create file in component storage + if hasattr(actionDoc, 'documentData') and actionDoc.documentData is not None: + rawData = actionDoc.documentData + if isinstance(rawData, bytes): + contentBytes = rawData + elif isinstance(rawData, str): + contentBytes = rawData.encode('utf-8') + else: + contentBytes = json.dumps(rawData, ensure_ascii=False).encode('utf-8') + fileItem = self.services.interfaceDbComponent.createFile( name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else f"task_{taskResult.taskId}_result.txt", mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", - content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + content=contentBytes ) - # Persist file data self.services.interfaceDbComponent.createFileData( fileItem.id, - actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + contentBytes ) # Get file info @@ -651,7 +652,7 @@ class WorkflowProcessor: chatDoc = { "fileId": fileItem.id, "fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, - "fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), + "fileSize": fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), "mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, "roundNumber": workflow.currentRound, "taskNumber": workflow.getTaskIndex(), diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index dfc617da..c81977c1 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -8,7 +8,6 @@ import json from modules.datamodels.datamodelChat import ( UserInputRequest, - ChatMessage, ChatWorkflow, ChatDocument, WorkflowModeEnum @@ -44,11 +43,6 @@ class WorkflowManager: # Store workflow in services for reference (this is the ChatWorkflow object) self.services.workflow = workflow - # CRITICAL: Update all method instances to use the current Services object with the correct workflow - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - discoverMethods(self.services) - logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - if workflow.status == "running": logger.info(f"Stopping running workflow {workflowId} before processing new prompt") workflow.status = "stopped" @@ -57,12 +51,13 @@ class WorkflowManager: "status": "stopped", "lastActivity": currentTime }) - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped for new prompt", - "type": "info", - "status": "stopped", - "progress": 1.0 - }) + if workflow.status == "stopped": + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped for new prompt", + "type": "info", + "status": "stopped", + "progress": 1.0 + }) newRound = workflow.currentRound + 1 self.services.chat.updateWorkflow(workflowId, { @@ -170,7 +165,10 @@ class WorkflowManager: self.services.currentUserPrompt = userInput.prompt # Reset progress logger for new workflow - self.services.chat._progressLogger = None + if hasattr(self.services.chat, 'resetProgressLogger'): + self.services.chat.resetProgressLogger() + else: + self.services.chat._progressLogger = None # Reset workflow history flag at start of each workflow setattr(self.services, '_needsWorkflowHistory', False) @@ -565,9 +563,10 @@ The following is the user's original input message. Analyze intent, normalize th logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") + except WorkflowStoppedException: + raise except Exception as e: logger.error(f"Error in _executeFastPath: {str(e)}") - # Fall back to full workflow on error logger.info("Falling back to full workflow due to fast path error") taskPlan = await self._planTasks(userInput) await self._executeTasks(taskPlan) @@ -897,8 +896,8 @@ The following is the user's original input message. Analyze intent, normalize th failedActions=[], successfulActions=[], criteriaProgress={ - 'met_criteria': set(), - 'unmet_criteria': set(), + 'met_criteria': [], + 'unmet_criteria': [], 'attempt_history': [] } ) @@ -1021,11 +1020,11 @@ The following is the user's original input message. Analyze intent, normalize th }) return elif workflow.status == 'failed': - # Create error message + lastError = getattr(workflow, '_lastError', None) or "Processing failed" errorMessage = { "workflowId": workflow.id, "role": "assistant", - "message": f"Workflow failed: {'Unknown error'}", + "message": f"Workflow failed: {lastError}", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": self.services.utils.timestampGetUtc(), @@ -1051,9 +1050,8 @@ The following is the user's original input message. Analyze intent, normalize th "totalActions": workflow.totalActions }) - # Add failed log entry self.services.chat.storeLog(workflow, { - "message": "Workflow failed: Unknown error", + "message": f"Workflow failed: {lastError}", "type": "error", "status": "failed", "progress": 1.0 @@ -1155,7 +1153,6 @@ The following is the user's original input message. Analyze intent, normalize th """Generate feedback message for workflow completion""" try: workflow = self.services.workflow - checkWorkflowStopped(self.services) # Count messages by role userMessages = [msg for msg in workflow.messages if msg.role == 'user'] @@ -1227,7 +1224,6 @@ The following is the user's original input message. Analyze intent, normalize th workflow = self.services.workflow logger.error(f"Workflow processing error: {str(error)}") - # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = self.services.utils.timestampGetUtc() self.services.chat.updateWorkflow(workflow.id, { @@ -1237,11 +1233,10 @@ The following is the user's original input message. Analyze intent, normalize th "totalActions": workflow.totalActions }) - # Create error message error_message = { "workflowId": workflow.id, "role": "assistant", - "message": f"Workflow processing failed: {str(error)}", + "message": "Workflow processing encountered an error. Please try again.", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": self.services.utils.timestampGetUtc(), @@ -1257,15 +1252,12 @@ The following is the user's original input message. Analyze intent, normalize th } self.services.chat.storeMessageWithDocuments(workflow, error_message, []) - # Add error log entry self.services.chat.storeLog(workflow, { "message": f"Workflow failed: {str(error)}", "type": "error", "status": "failed", "progress": 1.0 }) - - raise async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]: """Process file IDs from existing files and return ChatDocument objects. @@ -1365,21 +1357,3 @@ The following is the user's original input message. Analyze intent, normalize th # Return original content on error return contentBytes - def _checkIfHistoryAvailable(self) -> bool: - """Check if workflow history is available (previous rounds exist). - - Returns True if there are previous workflow rounds with messages. - """ - try: - from modules.workflows.processing.shared.placeholderFactory import getPreviousRoundContext - - history = getPreviousRoundContext(self.services) - - # Check if history contains actual content (not just "No previous round context available") - if history and history != "No previous round context available": - return True - - return False - except Exception as e: - logger.error(f"Error checking if history is available: {str(e)}") - return False