From d43044cc00d1d51478a29b316f18c8eb526872e5 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 19 Nov 2025 19:14:31 +0100
Subject: [PATCH] danymic intelligent json merging strategies for cut json
parts after tokenmax exit
---
env_dev.env | 2 +-
env_prod.env | 2 +-
modules/aicore/aicorePluginAnthropic.py | 4 +-
modules/aicore/aicorePluginPerplexity.py | 2 +-
modules/services/serviceAi/mainServiceAi.py | 325 +++++++++++++++-
.../subPromptBuilderExtraction.py | 20 +-
modules/shared/debugLogger.py | 37 +-
.../workflows/processing/core/taskPlanner.py | 5 +
.../workflows/processing/modes/modeDynamic.py | 39 +-
.../processing/shared/placeholderFactory.py | 59 +--
.../shared/promptGenerationActionsDynamic.py | 12 +-
.../workflows/processing/workflowProcessor.py | 10 +-
modules/workflows/workflowManager.py | 36 +-
tests/functional/test05_openai_timeout.py | 367 ------------------
.../test05_workflow_with_documents.py | 351 +++++++++++++++++
15 files changed, 755 insertions(+), 516 deletions(-)
delete mode 100644 tests/functional/test05_openai_timeout.py
create mode 100644 tests/functional/test05_workflow_with_documents.py
diff --git a/env_dev.env b/env_dev.env
index da72e528..95b2b91e 100644
--- a/env_dev.env
+++ b/env_dev.env
@@ -73,7 +73,7 @@ Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpEbm0yRUJ6VUJK
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = True
-APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
+APP_DEBUG_CHAT_WORKFLOW_DIR = D:/Athi/Local/Web/poweron/local/debug
# Manadate Pre-Processing Servers
PREPROCESS_ALTHAUS_CHAT_SECRET = (empty)
\ No newline at end of file
diff --git a/env_prod.env b/env_prod.env
index c7699b03..0daaff02 100644
--- a/env_prod.env
+++ b/env_prod.env
@@ -73,7 +73,7 @@ Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4d3Z4d2x6N1F
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE
-APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat# Development Environment Configuration
+APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat
# Manadate Pre-Processing Servers
PREPROCESS_ALTHAUS_CHAT_SECRET = kj823u90209mj020394jp2msakhfkjashjkf
\ No newline at end of file
diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py
index c26bdaf2..422056d0 100644
--- a/modules/aicore/aicorePluginAnthropic.py
+++ b/modules/aicore/aicorePluginAnthropic.py
@@ -26,8 +26,10 @@ class AiAnthropic(BaseConnectorAi):
self.apiKey = self.config["apiKey"]
# HttpClient for API calls
+ # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer
+ # Document generation and complex AI operations can take significantly longer
self.httpClient = httpx.AsyncClient(
- timeout=120.0, # Longer timeout for complex requests
+ timeout=600.0,
headers={
"x-api-key": self.apiKey,
"anthropic-version": "2023-06-01", # Anthropic API Version
diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py
index 86e06898..2a6f0890 100644
--- a/modules/aicore/aicorePluginPerplexity.py
+++ b/modules/aicore/aicorePluginPerplexity.py
@@ -27,7 +27,7 @@ class AiPerplexity(BaseConnectorAi):
# HttpClient for API calls
self.httpClient = httpx.AsyncClient(
- timeout=120.0, # Longer timeout for complex requests
+ timeout=600.0, # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer
headers={
"Authorization": f"Bearer {self.apiKey}",
"Content-Type": "application/json",
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index 055a8b9e..ada6cd22 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -33,7 +33,7 @@ class AiService:
"""
self.services = serviceCenter
# Only depend on interfaces
- self.aiObjects = None # Will be initialized in create() or _ensureAiObjectsInitialized()
+ self.aiObjects = None # Will be initialized in create() or ensureAiObjectsInitialized()
# Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready
self.extractionService = None
@@ -46,7 +46,7 @@ class AiService:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
- async def _ensureAiObjectsInitialized(self):
+ async def ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized and submodules are ready."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
@@ -296,8 +296,10 @@ Respond with ONLY a JSON object in this exact format:
logger.warning(f"Iteration {iteration}: No sections extracted, stopping")
break
- # Add new sections to accumulator
- allSections.extend(extractedSections)
+ # Merge new sections with existing sections intelligently
+ # This handles the STANDARD CASE: broken JSON iterations must be merged together
+ # The break can occur anywhere - in any section, at any depth
+ allSections = self._mergeSectionsIntelligently(allSections, extractedSections, iteration)
# Check if we should continue (completion detection)
if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result):
@@ -331,6 +333,317 @@ Respond with ONLY a JSON object in this exact format:
return final_result
+ def _mergeSectionsIntelligently(
+ self,
+ existingSections: List[Dict[str, Any]],
+ newSections: List[Dict[str, Any]],
+ iteration: int
+ ) -> List[Dict[str, Any]]:
+ """
+ Intelligently merge sections from multiple iterations.
+
+ This is a GENERIC merging strategy that handles broken JSON iterations.
+ The break can occur anywhere - in any section, at any depth.
+
+ Merging strategies (in order of priority):
+ 1. Same Section ID: Merge sections with identical IDs
+ 2. Same Content-Type + Position: If last section is incomplete and new section continues it
+ 3. Same Order: Merge sections with same order value
+ 4. Structural Analysis: Detect continuation based on content structure
+
+ Args:
+ existingSections: Sections accumulated from previous iterations
+ newSections: Sections extracted from current iteration
+ iteration: Current iteration number
+
+ Returns:
+ Merged list of sections
+ """
+ if not newSections:
+ return existingSections
+
+ if not existingSections:
+ return newSections
+
+ mergedSections = existingSections.copy()
+
+ for newSection in newSections:
+ merged = False
+
+ # Strategy 1: Same Section ID - merge directly
+ newSectionId = newSection.get("id")
+ if newSectionId:
+ for i, existingSection in enumerate(mergedSections):
+ if existingSection.get("id") == newSectionId:
+ # Merge sections with same ID
+ mergedSections[i] = self._mergeSectionContent(existingSection, newSection, iteration)
+ merged = True
+ logger.debug(f"Iteration {iteration}: Merged section by ID '{newSectionId}'")
+ break
+
+ if merged:
+ continue
+
+ # Strategy 2: Same Content-Type + Position (continuation detection)
+ # Check if last section is incomplete and new section continues it
+ if mergedSections:
+ lastSection = mergedSections[-1]
+ lastContentType = lastSection.get("content_type")
+ newContentType = newSection.get("content_type")
+
+ if lastContentType == newContentType:
+ # Same content type - check if last section is incomplete
+ if self._isSectionIncomplete(lastSection):
+ # Last section is incomplete, merge with new section
+ mergedSections[-1] = self._mergeSectionContent(lastSection, newSection, iteration)
+ merged = True
+ logger.debug(f"Iteration {iteration}: Merged section by content-type continuation ({lastContentType})")
+ continue
+
+ # Strategy 3: Same Order value
+ newOrder = newSection.get("order")
+ if newOrder is not None:
+ for i, existingSection in enumerate(mergedSections):
+ existingOrder = existingSection.get("order")
+ if existingOrder is not None and existingOrder == newOrder:
+ # Merge sections with same order
+ mergedSections[i] = self._mergeSectionContent(existingSection, newSection, iteration)
+ merged = True
+ logger.debug(f"Iteration {iteration}: Merged section by order {newOrder}")
+ break
+
+ if merged:
+ continue
+
+ # Strategy 4: Structural Analysis - detect continuation
+ # For code_block: if last section is code_block and new section is also code_block, merge
+ if mergedSections:
+ lastSection = mergedSections[-1]
+ if (lastSection.get("content_type") == "code_block" and
+ newSection.get("content_type") == "code_block"):
+ # Both are code blocks - merge them
+ mergedSections[-1] = self._mergeSectionContent(lastSection, newSection, iteration)
+ merged = True
+ logger.debug(f"Iteration {iteration}: Merged code_block sections by structural analysis")
+ continue
+
+ # No merge strategy matched - add as new section
+ if not merged:
+ mergedSections.append(newSection)
+ logger.debug(f"Iteration {iteration}: Added new section '{newSection.get('id', 'no-id')}' ({newSection.get('content_type', 'unknown')})")
+
+ return mergedSections
+
+ def _isSectionIncomplete(self, section: Dict[str, Any]) -> bool:
+ """
+ Check if a section is incomplete (broken at the end).
+
+ This detects incomplete sections based on content analysis:
+ - Code blocks: ends mid-line, ends with comma, ends with incomplete structure
+ - Text sections: ends mid-sentence, ends with incomplete structure
+ - Other types: check for incomplete elements
+ """
+ contentType = section.get("content_type", "")
+ elements = section.get("elements", [])
+
+ if not elements:
+ return False
+
+ # Handle list of elements
+ if isinstance(elements, list) and len(elements) > 0:
+ lastElement = elements[-1]
+ else:
+ lastElement = elements
+
+ if not isinstance(lastElement, dict):
+ return False
+
+ # Check code_block for incomplete code
+ if contentType == "code_block":
+ code = lastElement.get("code", "")
+ if code:
+ # Check if code ends incompletely:
+ # - Ends with comma (incomplete CSV line)
+ # - Ends with number but no newline (incomplete line)
+ # - Ends mid-token (e.g., "23431,23" - incomplete number)
+ codeStripped = code.rstrip()
+ if codeStripped:
+ # Check for incomplete patterns
+ if codeStripped.endswith(',') or (',' in codeStripped and not codeStripped.endswith('\n')):
+ # Ends with comma or has comma but no final newline - likely incomplete
+ return True
+ # Check if last line is incomplete (doesn't end with newline and has partial content)
+ if not code.endswith('\n') and codeStripped:
+ # No final newline - might be incomplete
+ # More sophisticated: check if last number is complete
+ lastLine = codeStripped.split('\n')[-1]
+ if lastLine and ',' in lastLine:
+ # Has commas but might be incomplete
+ parts = lastLine.split(',')
+ if parts and len(parts[-1]) < 5: # Last part is very short - might be incomplete
+ return True
+
+ # Check paragraph/text for incomplete sentences
+ if contentType in ["paragraph", "heading"]:
+ text = lastElement.get("text", "")
+ if text:
+ # Simple heuristic: if doesn't end with sentence-ending punctuation
+ textStripped = text.rstrip()
+ if textStripped and not textStripped[-1] in '.!?':
+ # Might be incomplete, but this is less reliable
+ # Only mark as incomplete if very short (likely cut off)
+ if len(textStripped) < 20:
+ return True
+
+ return False
+
+ def _mergeSectionContent(
+ self,
+ existingSection: Dict[str, Any],
+ newSection: Dict[str, Any],
+ iteration: int
+ ) -> Dict[str, Any]:
+ """
+ Merge content from two sections.
+
+ Handles different content types:
+ - code_block: Append code, handle overlaps, merge incomplete lines
+ - paragraph/heading: Append text
+ - table: Merge rows
+ - list: Merge items
+ - Other: Merge elements
+ """
+ contentType = existingSection.get("content_type", "")
+ existingElements = existingSection.get("elements", [])
+ newElements = newSection.get("elements", [])
+
+ if not newElements:
+ return existingSection
+
+ # Handle list of elements
+ if isinstance(existingElements, list):
+ existingElem = existingElements[-1] if existingElements else {}
+ else:
+ existingElem = existingElements
+
+ if isinstance(newElements, list):
+ newElem = newElements[0] if newElements else {}
+ else:
+ newElem = newElements
+
+ if not isinstance(existingElem, dict) or not isinstance(newElem, dict):
+ return existingSection
+
+ # Merge based on content type
+ if contentType == "code_block":
+ existingCode = existingElem.get("code", "")
+ newCode = newElem.get("code", "")
+
+ if existingCode and newCode:
+ mergedCode = self._mergeCodeBlocks(existingCode, newCode, iteration)
+ existingElem["code"] = mergedCode
+ # Preserve language from existing or new
+ if "language" not in existingElem and "language" in newElem:
+ existingElem["language"] = newElem["language"]
+
+ elif contentType in ["paragraph", "heading"]:
+ existingText = existingElem.get("text", "")
+ newText = newElem.get("text", "")
+
+ if existingText and newText:
+ # Append text with space if needed
+ if existingText.rstrip() and not existingText.rstrip()[-1] in '.!?\n':
+ mergedText = existingText.rstrip() + " " + newText.lstrip()
+ else:
+ mergedText = existingText.rstrip() + "\n" + newText.lstrip()
+ existingElem["text"] = mergedText
+
+ elif contentType == "table":
+ # Merge table rows
+ existingRows = existingElem.get("rows", [])
+ newRows = newElem.get("rows", [])
+ if existingRows and newRows:
+ existingElem["rows"] = existingRows + newRows
+
+ elif contentType in ["bullet_list", "numbered_list"]:
+ # Merge list items
+ existingItems = existingElem.get("items", [])
+ newItems = newElem.get("items", [])
+ if existingItems and newItems:
+ existingElem["items"] = existingItems + newItems
+
+ # Update section with merged content
+ mergedSection = existingSection.copy()
+ if isinstance(existingElements, list):
+ mergedSection["elements"] = existingElements
+ else:
+ mergedSection["elements"] = existingElem
+
+ # Preserve metadata from new section if missing in existing
+ if "order" not in mergedSection and "order" in newSection:
+ mergedSection["order"] = newSection["order"]
+
+ return mergedSection
+
+ def _mergeCodeBlocks(self, existingCode: str, newCode: str, iteration: int) -> str:
+ """
+ Merge two code blocks intelligently, handling overlaps and incomplete lines.
+ """
+ if not existingCode:
+ return newCode
+ if not newCode:
+ return existingCode
+
+ existingLines = existingCode.rstrip().split('\n')
+ newLines = newCode.strip().split('\n')
+
+ if not existingLines or not newLines:
+ return existingCode + "\n" + newCode
+
+ lastExistingLine = existingLines[-1].strip()
+ firstNewLine = newLines[0].strip()
+
+ # Strategy 1: Exact overlap - remove duplicate line
+ if lastExistingLine == firstNewLine:
+ newLines = newLines[1:]
+ logger.debug(f"Iteration {iteration}: Removed exact duplicate line in code merge")
+
+ # Strategy 2: Incomplete line merge
+ # If last existing line ends with comma or is incomplete, merge with first new line
+ elif lastExistingLine.endswith(',') or (',' in lastExistingLine and len(lastExistingLine.split(',')[-1]) < 5):
+ # Last line is incomplete - merge with first new line
+ # Remove trailing comma from existing line
+ mergedLine = lastExistingLine.rstrip(',') + ',' + firstNewLine.lstrip()
+ existingLines[-1] = mergedLine
+ newLines = newLines[1:]
+ logger.debug(f"Iteration {iteration}: Merged incomplete line with continuation")
+
+ # Strategy 3: Partial overlap detection
+ # Check if first new line starts with the end of last existing line
+ elif ',' in lastExistingLine and ',' in firstNewLine:
+ lastExistingParts = lastExistingLine.split(',')
+ firstNewParts = firstNewLine.split(',')
+
+ # Check for overlap: if last part of existing matches first part of new
+ if lastExistingParts and firstNewParts:
+ lastExistingPart = lastExistingParts[-1].strip()
+ firstNewPart = firstNewParts[0].strip()
+
+ # If they match, there's overlap
+ if lastExistingPart == firstNewPart and len(lastExistingParts) > 1:
+ # Remove overlapping part from new line
+ newLines[0] = ','.join(firstNewParts[1:])
+ logger.debug(f"Iteration {iteration}: Removed partial overlap in code merge")
+
+ # Reconstruct merged code
+ mergedCode = '\n'.join(existingLines)
+ if newLines:
+ if mergedCode and not mergedCode.endswith('\n'):
+ mergedCode += '\n'
+ mergedCode += '\n'.join(newLines)
+
+ return mergedCode
+
def _extractSectionsFromResponse(
self,
result: str,
@@ -513,7 +826,7 @@ Respond with ONLY a JSON object in this exact format:
Returns:
Planning JSON response
"""
- await self._ensureAiObjectsInitialized()
+ await self.ensureAiObjectsInitialized()
# Planning calls always use static parameters
options = AiCallOptions(
@@ -569,7 +882,7 @@ Respond with ONLY a JSON object in this exact format:
Returns:
AiResponse with content, metadata, and optional documents
"""
- await self._ensureAiObjectsInitialized()
+ await self.ensureAiObjectsInitialized()
# Create separate operationId for detailed progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
diff --git a/modules/services/serviceExtraction/subPromptBuilderExtraction.py b/modules/services/serviceExtraction/subPromptBuilderExtraction.py
index a796ea3b..f6329a5c 100644
--- a/modules/services/serviceExtraction/subPromptBuilderExtraction.py
+++ b/modules/services/serviceExtraction/subPromptBuilderExtraction.py
@@ -156,24 +156,8 @@ Extract the ACTUAL CONTENT from the source documents. Do not use placeholder tex
pass
# Save extraction prompt to debug file - only if debug enabled
- if services:
- try:
- debug_enabled = services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- from modules.shared.configuration import APP_CONFIG
- logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- logDir = os.path.join(gatewayDir, logDir)
- debug_root = os.path.join(logDir, 'debug')
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_prompt.txt"), "w", encoding="utf-8") as f:
- f.write(adaptive_prompt)
- except Exception:
- pass
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(adaptive_prompt, "extraction_prompt")
return adaptive_prompt
diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py
index 08fccc63..c68546bf 100644
--- a/modules/shared/debugLogger.py
+++ b/modules/shared/debugLogger.py
@@ -25,14 +25,28 @@ def _isDebugEnabled() -> bool:
"""Check if debug workflow logging is enabled."""
return APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
-def _getDebugDir() -> str:
- """Get the debug directory path from configuration."""
- # Get log directory from config (same as used by main logging system)
+def _getBaseDebugDir() -> str:
+ """Get the base debug directory path from configuration."""
+ # Check if custom debug directory is configured
+ customDebugDir = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_DIR", None)
+ if customDebugDir:
+ # Use custom debug directory if configured
+ if not os.path.isabs(customDebugDir):
+ # If relative path, make it relative to the gateway directory
+ gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ customDebugDir = os.path.join(gatewayDir, customDebugDir)
+ return customDebugDir
+
+ # Default: Get log directory from config (same as used by main logging system)
logDir = _resolveLogDir()
# Create debug subdirectory within the log directory
- debugDir = os.path.join(logDir, 'debug/prompts')
- return debugDir
+ return os.path.join(logDir, 'debug')
+
+def _getDebugDir() -> str:
+ """Get the debug prompts directory path from configuration."""
+ baseDebugDir = _getBaseDebugDir()
+ return os.path.join(baseDebugDir, 'prompts')
def _getNextSequenceNumber() -> int:
"""Get the next sequence number by counting existing files."""
@@ -50,7 +64,7 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None
Write debug content to a file with sequential numbering.
Writes the content as-is since it's already the final integrated prompt.
Includes document list labels for tracing enhancement.
- Only writes if debug logging is enabled via APP_DEBUG_CHAT_WORKFLOW_ENABLED config.
+ Only writes if debug logging is enabled via _isDebugEnabled() function.
Args:
content: The main content to write (already integrated)
@@ -111,9 +125,8 @@ def debugLogToFile(message: str, context: str = "DEBUG") -> None:
if not _isDebugEnabled():
return
- # Get debug directory
- logDir = _resolveLogDir()
- debug_dir = os.path.join(logDir, 'debug')
+ # Get debug directory (use base debug dir, not prompts subdirectory)
+ debug_dir = _getBaseDebugDir()
_ensureDir(debug_dir)
# Create debug file path
@@ -148,9 +161,9 @@ def storeDebugMessageAndDocuments(message, currentUser) -> None:
import json
from datetime import datetime, UTC
- # Create base debug directory
- logDir = _resolveLogDir()
- debug_root = os.path.join(logDir, 'debug', 'messages')
+ # Create base debug directory (use base debug dir, not prompts subdirectory)
+ baseDebugDir = _getBaseDebugDir()
+ debug_root = os.path.join(baseDebugDir, 'messages')
_ensureDir(debug_root)
# Generate timestamp
diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py
index d9baa462..78f2da3e 100644
--- a/modules/workflows/processing/core/taskPlanner.py
+++ b/modules/workflows/processing/core/taskPlanner.py
@@ -157,6 +157,11 @@ class TaskPlanner:
if 'description' in taskDict and 'objective' not in taskDict:
taskDict['objective'] = taskDict.pop('description')
+ # Ensure objective is always set (required field)
+ if 'objective' not in taskDict or not taskDict.get('objective'):
+ logger.warning(f"Task {i+1} missing objective, using fallback")
+ taskDict['objective'] = actualUserPrompt or 'Task objective not specified'
+
# Extract format details from workflow intent and populate TaskStep
# Use workflow-level intent for format requirements (tasks inherit from workflow)
if isinstance(workflowIntent, dict):
diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py
index 6af8e8e1..49097bf4 100644
--- a/modules/workflows/processing/modes/modeDynamic.py
+++ b/modules/workflows/processing/modes/modeDynamic.py
@@ -367,27 +367,25 @@ class DynamicMode(BaseMode):
debugType="paramplan"
)
- # Parse JSON response using structured parsing with ActionDefinition model
- from modules.shared.jsonUtils import parseJsonWithModel
- from modules.datamodels.datamodelWorkflow import ActionDefinition
+ # Parse JSON response - Stage 2 only returns parameters structure, not full ActionDefinition
+ from modules.shared.jsonUtils import tryParseJson
- try:
- # Parse response string as ActionDefinition (Stage 2 adds parameters)
- actionDef = parseJsonWithModel(paramsResp, ActionDefinition)
- # Extract parameters from parsed model
- parameters = actionDef.parameters if actionDef.parameters else {}
-
- # Extract userMessage from Stage 2 response if available
- # Stage 2 can override Stage 1 userMessage with more specific message
- if hasattr(actionDef, 'userMessage') and actionDef.userMessage:
- selection['userMessage'] = actionDef.userMessage
- except ValueError as e:
- logger.error(f"Failed to parse ActionDefinition from parameters response: {e}")
+ jsonObj, parseError, cleanedStr = tryParseJson(paramsResp)
+ if parseError or not isinstance(jsonObj, dict):
+ logger.error(f"Failed to parse JSON from parameters response: {parseError}")
logger.error(f"Response was: {paramsResp[:500]}...")
- raise ValueError(f"AI parameters response invalid: {e}")
+ raise ValueError(f"AI parameters response invalid JSON: {parseError}")
+ # Extract parameters from response (Stage 2 only provides parameters, not full ActionDefinition)
+ parameters = jsonObj.get('parameters', {})
if not isinstance(parameters, dict):
raise ValueError("AI parameters response missing 'parameters' object")
+
+ # Extract userMessage from Stage 2 response if available
+ # Stage 2 can override Stage 1 userMessage with more specific message
+ userMessage = jsonObj.get('userMessage')
+ if userMessage:
+ selection['userMessage'] = userMessage
# Merge Stage 1 resource selections into Stage 2 parameters (only if action expects them)
try:
@@ -731,15 +729,8 @@ class DynamicMode(BaseMode):
)
try:
- # Parse response string as ReviewResult
+ # Parse response string as ReviewResult (prompt now correctly asks for "status")
decision = parseJsonWithModel(resp, ReviewResult)
-
- # Map "stop" decision to "success" status for ReviewResult
- if hasattr(decision, 'decision') and decision.decision == 'stop':
- decision.status = 'success'
- elif not hasattr(decision, 'status') or not decision.status:
- decision.status = 'continue'
-
return decision
except ValueError as e:
logger.warning(f"Failed to parse ReviewResult from response: {e}. Using default.")
diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py
index 4f3f6ad8..1f4c5845 100644
--- a/modules/workflows/processing/shared/placeholderFactory.py
+++ b/modules/workflows/processing/shared/placeholderFactory.py
@@ -10,8 +10,8 @@ NAMING CONVENTION:
MAPPING TABLE (keys β function) with usage [taskplan | dynamic]:
{{KEY:USER_PROMPT}} -> extractUserPrompt() [taskplan, dynamic]
-{{KEY:OVERALL_TASK_CONTEXT}} -> extractOverallTaskContext() [dynamic]
-{{KEY:TASK_OBJECTIVE}} -> extractTaskObjective() [dynamic]
+{{KEY:OVERALL_TASK_CONTEXT}} -> services.currentUserPromptNormalized (always set in WorkflowManager._sendFirstMessage) [direct]
+{{KEY:TASK_OBJECTIVE}} -> context.taskStep.objective (always set in TaskPlanner.generateTaskPlan) [direct]
{{KEY:USER_LANGUAGE}} -> extractUserLanguage() [dynamic]
{{KEY:LANGUAGE_USER_DETECTED}} -> extractLanguageUserDetected() [taskplan]
{{KEY:WORKFLOW_HISTORY}} -> extractWorkflowHistory() [taskplan, dynamic]
@@ -38,57 +38,6 @@ from typing import Dict, Any, List
logger = logging.getLogger(__name__)
from modules.workflows.processing.shared.methodDiscovery import (methods, discoverMethods)
-def extractOverallTaskContext(service: Any, context: Any) -> str:
- """Extract the original normalized user request (overall task context). Maps to {{KEY:OVERALL_TASK_CONTEXT}}.
- Always returns the original user request, not the task objective.
- """
- try:
- # Always prefer the normalized user prompt from services (original request)
- if service:
- # Prefer normalized version if available
- normalized = getattr(service, 'currentUserPromptNormalized', None)
- if normalized:
- return normalized
-
- # Fallback to currentUserPrompt (original request)
- currentPrompt = getattr(service, 'currentUserPrompt', None)
- if currentPrompt:
- return currentPrompt
-
- # If no services available, try to get from workflow's first message
- if hasattr(context, 'workflow') and context.workflow:
- messages = getattr(context.workflow, 'messages', []) or []
- if messages:
- firstMessage = messages[0]
- msgContent = getattr(firstMessage, 'message', None) or ''
- if msgContent:
- return msgContent
-
- return 'No overall task context available'
- except Exception:
- return 'No overall task context available'
-
-def extractTaskObjective(context: Any) -> str:
- """Extract the task objective from taskStep. Maps to {{KEY:TASK_OBJECTIVE}}.
- Returns the specific task objective, not the overall user request.
- """
- try:
- if hasattr(context, 'taskStep') and context.taskStep:
- objective = getattr(context.taskStep, 'objective', None)
- if objective:
- return objective
-
- # Fallback: try to get from services
- services = getattr(context, 'services', None)
- if services:
- currentPrompt = getattr(services, 'currentUserPrompt', None)
- if currentPrompt:
- return currentPrompt
-
- return 'No task objective specified'
- except Exception:
- return 'No task objective specified'
-
def extractUserPrompt(context: Any) -> str:
"""Extract user prompt from context. Maps to {{KEY:USER_PROMPT}}.
Prefer the cleaned intent stored on the services object if available via context.
@@ -102,7 +51,7 @@ def extractUserPrompt(context: Any) -> str:
if services and getattr(services, 'currentUserPrompt', None):
rawPrompt = services.currentUserPrompt
elif hasattr(context, 'taskStep') and context.taskStep:
- rawPrompt = context.taskStep.objective or 'No request specified'
+ rawPrompt = context.taskStep.objective
else:
rawPrompt = 'No request specified'
@@ -114,7 +63,7 @@ def extractUserPrompt(context: Any) -> str:
except Exception:
# Robust fallback behavior
if hasattr(context, 'taskStep') and context.taskStep:
- return context.taskStep.objective or 'No request specified'
+ return context.taskStep.objective
return 'No request specified'
def extractWorkflowHistory(service: Any) -> str:
diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
index 7985fdb1..1ed82551 100644
--- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
+++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
@@ -17,16 +17,14 @@ from modules.workflows.processing.shared.placeholderFactory import (
extractLearningsAndImprovements,
extractLatestRefinementFeedback,
extractWorkflowHistory,
- extractOverallTaskContext,
- extractTaskObjective,
)
from modules.workflows.processing.shared.methodDiscovery import methods, getActionParameterList
def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=None) -> PromptBundle:
"""Define placeholders first, then the template; return PromptBundle."""
placeholders: List[PromptPlaceholder] = [
- PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=extractOverallTaskContext(services, context), summaryAllowed=False),
- PromptPlaceholder(label="TASK_OBJECTIVE", content=extractTaskObjective(context), summaryAllowed=False),
+ PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=services.currentUserPromptNormalized, summaryAllowed=False),
+ PromptPlaceholder(label="TASK_OBJECTIVE", content=context.taskStep.objective, summaryAllowed=False),
PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False),
PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False),
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True),
@@ -197,7 +195,7 @@ Excludes documents/connections/history entirely.
learningsText = ""
placeholders: List[PromptPlaceholder] = [
- PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=extractOverallTaskContext(services, context), summaryAllowed=False),
+ PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=services.currentUserPromptNormalized, summaryAllowed=False),
PromptPlaceholder(label="ACTION_OBJECTIVE", content=actionObjective, summaryAllowed=False),
PromptPlaceholder(label="SELECTED_ACTION", content=compoundActionName, summaryAllowed=False),
PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False),
@@ -296,12 +294,12 @@ OBJECTIVE: '{{KEY:USER_PROMPT}}'
DECISION RULES:
1. "continue" = objective NOT fulfilled
-2. "stop" = objective fulfilled
+2. "success" = objective fulfilled
3. Return ONLY JSON - no other text
OUTPUT FORMAT (only JSON object to deliver):
{{
- "decision": "continue",
+ "status": "continue",
"reason": "Brief reason for decision"
}}
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index 34fa8c12..b6b0a386 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -324,7 +324,7 @@ class WorkflowProcessor:
"""
try:
# Ensure AI service is initialized
- await self.services.ai._ensureAiObjectsInitialized()
+ await self.services.ai.ensureAiObjectsInitialized()
# Build complexity detection prompt (language-agnostic, semantic)
complexityPrompt = (
@@ -368,8 +368,8 @@ class WorkflowProcessor:
# Parse response
complexity = "moderate" # Default fallback
try:
- # Extract response content (AiResponse.content is a string)
- responseContent = aiResponse.content if isinstance(aiResponse, str) else (aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse))
+ # callAiPlanning returns a string directly, not an object
+ responseContent = str(aiResponse) if aiResponse else ""
# Extract JSON from response
jsonStr = extractJsonString(responseContent)
@@ -409,7 +409,7 @@ class WorkflowProcessor:
"""
try:
# Ensure AI service is initialized
- await self.services.ai._ensureAiObjectsInitialized()
+ await self.services.ai.ensureAiObjectsInitialized()
# Build fast path prompt (understand + execute + deliver in one call)
fastPathPrompt = (
@@ -496,7 +496,7 @@ class WorkflowProcessor:
from modules.shared.jsonUtils import parseJsonWithModel
# Ensure AI service is initialized
- await self.services.ai._ensureAiObjectsInitialized()
+ await self.services.ai.ensureAiObjectsInitialized()
# Build combined understanding prompt
understandingPrompt = (
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 85a6e32f..e5ffd2c6 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -161,23 +161,23 @@ class WorkflowManager:
self.services.chat._progressLogger = None
self.workflowProcessor = WorkflowProcessor(self.services)
- await self._sendFirstMessage(userInput)
- # Fast Path Detection and Routing
- # Get documents from first message if available
- workflow = self.services.workflow
+ # Process user-uploaded documents from userInput for complexity detection
+ # This is the correct way: use the input data directly, not workflow state
documents = []
- if workflow.messages and len(workflow.messages) > 0:
- # Get documents from the first message
- firstMessageId = workflow.messages[0] if isinstance(workflow.messages[0], str) else workflow.messages[0].id
- firstMessage = self.services.chat.getMessage(firstMessageId)
- if firstMessage and hasattr(firstMessage, 'documents'):
- documents = firstMessage.documents
+ if userInput.listFileId:
+ try:
+ documents = await self._processFileIds(userInput.listFileId, None)
+ except Exception as e:
+ logger.warning(f"Failed to process user fileIds for complexity detection: {e}")
- # Detect complexity (AI-based semantic understanding)
+ # Detect complexity (AI-based semantic understanding) using user input documents
complexity = await self.workflowProcessor.detectComplexity(userInput.prompt, documents)
logger.info(f"Request complexity detected: {complexity}")
+ # Now send the first message (which will also process the documents again, but that's fine)
+ await self._sendFirstMessage(userInput)
+
# Route to fast path for simple requests
if complexity == "simple":
logger.info("Routing to fast path for simple request")
@@ -342,6 +342,8 @@ class WorkflowManager:
logger.info("Skipping user intention analysis for AUTOMATION mode - using direct user input")
# For automation mode, use user input directly without AI analysis
self.services.currentUserPrompt = userInput.prompt
+ # Always set currentUserPromptNormalized - use user input directly for automation mode
+ self.services.currentUserPromptNormalized = userInput.prompt
detectedLanguage = None
normalizedRequest = None
intentText = userInput.prompt
@@ -409,13 +411,11 @@ class WorkflowManager:
except Exception:
pass
self.services.currentUserPrompt = intentText or userInput.prompt
- try:
- if normalizedRequest:
- setattr(self.services, 'currentUserPromptNormalized', normalizedRequest)
- if contextItems is not None:
- setattr(self.services, 'currentUserContextItems', contextItems)
- except Exception:
- pass
+ # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt
+ normalizedValue = normalizedRequest or intentText or userInput.prompt
+ self.services.currentUserPromptNormalized = normalizedValue
+ if contextItems is not None:
+ self.services.currentUserContextItems = contextItems
# Create documents for context items
if contextItems and isinstance(contextItems, list):
diff --git a/tests/functional/test05_openai_timeout.py b/tests/functional/test05_openai_timeout.py
deleted file mode 100644
index 68695c28..00000000
--- a/tests/functional/test05_openai_timeout.py
+++ /dev/null
@@ -1,367 +0,0 @@
-#!/usr/bin/env python3
-"""
-OpenAI Timeout Analysis Test - Tests OpenAI API calls to identify timeout issues
-Compares different scenarios to understand why OpenAI calls fail in functional tests but work in module tests.
-"""
-
-import asyncio
-import json
-import sys
-import os
-import time
-from typing import Dict, Any, List
-
-# Add the gateway to path (go up 2 levels from tests/functional/)
-_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
-if _gateway_path not in sys.path:
- sys.path.insert(0, _gateway_path)
-
-from modules.services import getInterface as getServices
-from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
-from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum
-import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
-
-
-class OpenAITimeoutTester:
- """Test OpenAI API calls to identify timeout issues."""
-
- def __init__(self):
- # Use root user for testing (has full access to everything)
- from modules.interfaces.interfaceDbAppObjects import getRootInterface
- rootInterface = getRootInterface()
- self.testUser = rootInterface.currentUser
-
- # Initialize services
- self.services = getServices(self.testUser, None)
- self.testResults = []
-
- async def initialize(self):
- """Initialize workflow and services."""
- import logging
- import uuid
- import time as time_module
-
- # Set logging level to DEBUG to see detailed logs
- logging.getLogger().setLevel(logging.DEBUG)
-
- # Create and save workflow in database
- currentTimestamp = time_module.time()
-
- testWorkflow = ChatWorkflow(
- id=str(uuid.uuid4()),
- name="OpenAI Timeout Test Workflow",
- status="running",
- startedAt=currentTimestamp,
- lastActivity=currentTimestamp,
- currentRound=1,
- currentTask=0,
- currentAction=0,
- totalTasks=0,
- totalActions=0,
- mandateId=self.testUser.mandateId,
- messageIds=[],
- workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
- maxSteps=5
- )
-
- # Save workflow to database
- interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
- workflowDict = testWorkflow.model_dump()
- interfaceDbChat.createWorkflow(workflowDict)
-
- # Set the workflow in services
- self.services.workflow = testWorkflow
-
- print("β
Services initialized")
- print(f"π Workflow ID: {testWorkflow.id}")
-
- async def testDirectOpenAICall(self, prompt: str, description: str) -> Dict[str, Any]:
- """Test direct OpenAI API call through the connector."""
- print(f"\n{'='*80}")
- print(f"TEST: {description}")
- print(f"{'='*80}")
- print(f"Prompt: {prompt[:100]}...")
-
- startTime = time.time()
- result = {
- "description": description,
- "prompt": prompt,
- "success": False,
- "error": None,
- "responseTime": 0,
- "responseLength": 0,
- "method": "direct_connector"
- }
-
- try:
- # Get OpenAI connector directly
- from modules.aicore.aicorePluginOpenai import AiOpenai
- from modules.datamodels.datamodelAi import AiModelCall, AiModel
-
- # Initialize connector
- connector = AiOpenai()
-
- # Get the gpt-4o model
- models = connector.getModels()
- gpt4oModel = None
- for model in models:
- if model.name == "gpt-4o":
- gpt4oModel = model
- break
-
- if not gpt4oModel:
- result["error"] = "gpt-4o model not found"
- return result
-
- # Create model call
- modelCall = AiModelCall(
- model=gpt4oModel,
- messages=[{"role": "user", "content": prompt}],
- options=AiCallOptions()
- )
-
- # Make the call
- print(f"β±οΈ Starting API call...")
- response = await connector.callAiBasic(modelCall)
-
- endTime = time.time()
- responseTime = endTime - startTime
-
- result["success"] = True
- result["responseTime"] = responseTime
- result["responseLength"] = len(response.content) if response.content else 0
-
- print(f"β
Success! Response time: {responseTime:.2f}s")
- print(f"π Response length: {result['responseLength']} characters")
- print(f"π Response preview: {response.content[:200] if response.content else 'None'}...")
-
- except Exception as e:
- endTime = time.time()
- responseTime = endTime - startTime
-
- result["error"] = str(e)
- result["responseTime"] = responseTime
-
- print(f"β Failed after {responseTime:.2f}s")
- print(f"π₯ Error: {type(e).__name__}: {str(e)}")
- import traceback
- print(f"π Traceback:\n{traceback.format_exc()}")
-
- self.testResults.append(result)
- return result
-
- async def testThroughAiService(self, prompt: str, description: str) -> Dict[str, Any]:
- """Test OpenAI call through AiService.callAiContent."""
- print(f"\n{'='*80}")
- print(f"TEST: {description}")
- print(f"{'='*80}")
- print(f"Prompt: {prompt[:100]}...")
-
- startTime = time.time()
- result = {
- "description": description,
- "prompt": prompt,
- "success": False,
- "error": None,
- "responseTime": 0,
- "responseLength": 0,
- "method": "ai_service"
- }
-
- try:
- from modules.datamodels.datamodelWorkflow import AiResponse
-
- options = AiCallOptions(
- operationType=OperationTypeEnum.DATA_GENERATE
- )
-
- print(f"β±οΈ Starting AI service call...")
- aiResponse: AiResponse = await self.services.ai.callAiContent(
- prompt=prompt,
- options=options,
- outputFormat="json"
- )
-
- endTime = time.time()
- responseTime = endTime - startTime
-
- result["success"] = True
- result["responseTime"] = responseTime
-
- if isinstance(aiResponse, AiResponse):
- content = aiResponse.content if aiResponse.content else ""
- result["responseLength"] = len(content)
- print(f"β
Success! Response time: {responseTime:.2f}s")
- print(f"π Response length: {result['responseLength']} characters")
- print(f"π Response preview: {content[:200] if content else 'None'}...")
- else:
- result["responseLength"] = len(str(aiResponse))
- print(f"β
Success! Response time: {responseTime:.2f}s")
- print(f"π Response length: {result['responseLength']} characters")
-
- except Exception as e:
- endTime = time.time()
- responseTime = endTime - startTime
-
- result["error"] = str(e)
- result["responseTime"] = responseTime
-
- print(f"β Failed after {responseTime:.2f}s")
- print(f"π₯ Error: {type(e).__name__}: {str(e)}")
- import traceback
- print(f"π Traceback:\n{traceback.format_exc()}")
-
- self.testResults.append(result)
- return result
-
- async def testTimeoutConfiguration(self) -> Dict[str, Any]:
- """Test timeout configuration of OpenAI connector."""
- print(f"\n{'='*80}")
- print("TEST: Timeout Configuration Analysis")
- print(f"{'='*80}")
-
- result = {
- "description": "Timeout Configuration Analysis",
- "timeout": None,
- "httpClientTimeout": None,
- "connectorType": None
- }
-
- try:
- from modules.aicore.aicorePluginOpenai import AiOpenai
-
- connector = AiOpenai()
- result["connectorType"] = connector.getConnectorType()
-
- # Check httpClient timeout
- if hasattr(connector, 'httpClient'):
- httpClient = connector.httpClient
- if hasattr(httpClient, 'timeout'):
- result["httpClientTimeout"] = str(httpClient.timeout)
- print(f"π HttpClient timeout: {httpClient.timeout}")
- else:
- print(f"β οΈ HttpClient has no timeout attribute")
-
- # Check for timeout in config
- from modules.shared.configuration import APP_CONFIG
- openaiTimeout = APP_CONFIG.get('Connector_AiOpenai_TIMEOUT', None)
- if openaiTimeout:
- result["timeout"] = openaiTimeout
- print(f"π Config timeout: {openaiTimeout}")
- else:
- print(f"π No timeout in config (using default)")
-
- print(f"β
Timeout analysis complete")
-
- except Exception as e:
- result["error"] = str(e)
- print(f"β Error analyzing timeout: {str(e)}")
- import traceback
- print(f"π Traceback:\n{traceback.format_exc()}")
-
- self.testResults.append(result)
- return result
-
- def printSummary(self):
- """Print test summary."""
- print(f"\n{'='*80}")
- print("OPENAI TIMEOUT TEST SUMMARY")
- print(f"{'='*80}")
-
- for i, result in enumerate(self.testResults, 1):
- print(f"\n[{i}] {result.get('description', 'Unknown')}")
- print(f" Method: {result.get('method', 'N/A')}")
- print(f" Success: {'β
' if result.get('success') else 'β'}")
- if result.get('responseTime'):
- print(f" Response Time: {result['responseTime']:.2f}s")
- if result.get('responseLength'):
- print(f" Response Length: {result['responseLength']} characters")
- if result.get('error'):
- print(f" Error: {result['error'][:200]}...")
- if result.get('timeout'):
- print(f" Timeout Config: {result['timeout']}")
- if result.get('httpClientTimeout'):
- print(f" HttpClient Timeout: {result['httpClientTimeout']}")
-
- # Analyze failures
- failures = [r for r in self.testResults if not r.get('success')]
- if failures:
- print(f"\n{'='*80}")
- print(f"FAILURES: {len(failures)}/{len(self.testResults)}")
- print(f"{'='*80}")
- for failure in failures:
- print(f"\nβ {failure.get('description')}")
- print(f" Method: {failure.get('method')}")
- print(f" Error: {failure.get('error', 'Unknown')[:200]}...")
-
-
-# Test scenarios
-TEST_SCENARIOS = [
- {
- "description": "Simple prompt (should work)",
- "prompt": "Say hello in one sentence."
- },
- {
- "description": "Medium complexity prompt",
- "prompt": "Generate a list of the first 100 prime numbers."
- },
- {
- "description": "Complex prompt (5000 primes - known to timeout)",
- "prompt": "Generate the first 5000 prime numbers in a table with 10 columns per row."
- },
- {
- "description": "Very simple JSON generation",
- "prompt": "Generate a JSON object with one field 'message' containing 'Hello World'."
- }
-]
-
-
-async def main():
- """Run OpenAI timeout analysis tests."""
- tester = OpenAITimeoutTester()
-
- print("="*80)
- print("OPENAI TIMEOUT ANALYSIS TEST")
- print("="*80)
- print("\nThis test analyzes why OpenAI calls timeout in functional tests.")
- print("It compares direct connector calls vs AiService calls.\n")
-
- await tester.initialize()
-
- # Test timeout configuration first
- await tester.testTimeoutConfiguration()
-
- # Test each scenario with both methods
- for scenario in TEST_SCENARIOS:
- prompt = scenario["prompt"]
- description = scenario["description"]
-
- # Test 1: Direct connector call
- await tester.testDirectOpenAICall(
- prompt=f"{description} - {prompt}",
- description=f"{description} (Direct Connector)"
- )
-
- # Wait a bit between tests
- await asyncio.sleep(2)
-
- # Test 2: Through AiService
- await tester.testThroughAiService(
- prompt=f"{description} - {prompt}",
- description=f"{description} (AiService)"
- )
-
- # Wait between scenarios
- await asyncio.sleep(3)
-
- # Print summary
- tester.printSummary()
-
- print(f"\n{'='*80}")
- print("TEST COMPLETE")
- print(f"{'='*80}")
-
-
-if __name__ == "__main__":
- asyncio.run(main())
-
diff --git a/tests/functional/test05_workflow_with_documents.py b/tests/functional/test05_workflow_with_documents.py
new file mode 100644
index 00000000..af1c9d7b
--- /dev/null
+++ b/tests/functional/test05_workflow_with_documents.py
@@ -0,0 +1,351 @@
+#!/usr/bin/env python3
+"""
+Workflow Test with Documents - Tests chat workflow execution with uploaded documents
+Simulates the UI route flow: upload files, start workflow with prompt and documents
+"""
+
+import asyncio
+import json
+import sys
+import os
+import time
+from typing import Dict, Any, List, Optional
+
+# Add the gateway to path (go up 2 levels from tests/functional/)
+_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+if _gateway_path not in sys.path:
+ sys.path.insert(0, _gateway_path)
+
+# Import the service initialization
+from modules.services import getInterface as getServices
+from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum
+from modules.datamodels.datamodelUam import User
+from modules.features.chatPlayground.mainChatPlayground import chatStart
+import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
+
+
+class WorkflowWithDocumentsTester:
+ def __init__(self):
+ # Use root user for testing (has full access to everything)
+ from modules.interfaces.interfaceDbAppObjects import getRootInterface
+ rootInterface = getRootInterface()
+ self.testUser = rootInterface.currentUser
+
+ # Initialize services using the existing system
+ self.services = getServices(self.testUser, None) # Test user, no workflow
+ self.workflow = None
+ self.testResults = {}
+
+ async def initialize(self):
+ """Initialize the test environment."""
+ # Set logging level to INFO to see workflow progress
+ import logging
+ logging.getLogger().setLevel(logging.INFO)
+
+ print(f"Initialized test with user: {self.testUser.id}")
+ print(f"Mandate ID: {self.testUser.mandateId}")
+
+ def createCsvTemplate(self) -> str:
+ """Create a CSV template file for prime numbers."""
+ csvContent = """Primzahl,Index
+2,1
+3,2
+5,3
+7,4
+11,5
+13,6
+17,7
+19,8
+23,9
+29,10
+"""
+ return csvContent
+
+ def createSecondDocument(self) -> str:
+ """Create a second text document with instructions."""
+ docContent = """Anweisungen zur Primzahlgenerierung:
+
+1. Generiere die ersten 5000 Primzahlen
+2. Formatiere sie in einer Tabelle mit 10 Spalten pro Zeile
+3. Verwende das bereitgestellte CSV-Vorlagenformat
+4. Stelle sicher, dass alle Zahlen korrekt formatiert sind
+5. FΓΌge eine Index-Spalte hinzu, die bei 1 beginnt
+
+
+"""
+ return docContent
+
+ async def uploadFiles(self) -> List[str]:
+ """Upload test files to the filesystem and return their file IDs."""
+ print("\n" + "="*60)
+ print("UPLOADING TEST FILES")
+ print("="*60)
+
+ fileIds = []
+
+ # Create CSV template file
+ csvContent = self.createCsvTemplate()
+ csvFileName = "prime_numbers_template.csv"
+
+ print(f"Creating CSV template: {csvFileName}")
+ print(f"Content length: {len(csvContent)} bytes")
+
+ # Create file in component storage
+ csvFileItem = self.services.interfaceDbComponent.createFile(
+ name=csvFileName,
+ mimeType="text/csv",
+ content=csvContent.encode('utf-8')
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(csvFileItem.id, csvContent.encode('utf-8'))
+
+ fileIds.append(csvFileItem.id)
+ print(f"β
Created CSV file with ID: {csvFileItem.id}")
+ print(f" File name: {csvFileItem.fileName}")
+ print(f" MIME type: {csvFileItem.mimeType}")
+
+ # Create second text document
+ docContent = self.createSecondDocument()
+ docFileName = "prime_numbers_instructions.txt"
+
+ print(f"\nCreating instruction document: {docFileName}")
+ print(f"Content length: {len(docContent)} bytes")
+
+ # Create file in component storage
+ docFileItem = self.services.interfaceDbComponent.createFile(
+ name=docFileName,
+ mimeType="text/plain",
+ content=docContent.encode('utf-8')
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(docFileItem.id, docContent.encode('utf-8'))
+
+ fileIds.append(docFileItem.id)
+ print(f"β
Created instruction file with ID: {docFileItem.id}")
+ print(f" File name: {docFileItem.fileName}")
+ print(f" MIME type: {docFileItem.mimeType}")
+
+ return fileIds
+
+ async def startWorkflow(self, prompt: str, fileIds: List[str]) -> None:
+ """Start a chat workflow with prompt and documents."""
+ print("\n" + "="*60)
+ print("STARTING WORKFLOW")
+ print("="*60)
+
+ print(f"Prompt: {prompt}")
+ print(f"Number of files: {len(fileIds)}")
+ print(f"File IDs: {fileIds}")
+
+ # Create UserInputRequest
+ userInput = UserInputRequest(
+ prompt=prompt,
+ listFileId=fileIds,
+ userLanguage="en"
+ )
+
+ # Start workflow (this is async and returns immediately)
+ print("\nCalling chatStart...")
+ self.workflow = await chatStart(
+ currentUser=self.testUser,
+ userInput=userInput,
+ workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
+ workflowId=None
+ )
+
+ print(f"β
Workflow started with ID: {self.workflow.id}")
+ print(f" Status: {self.workflow.status}")
+ print(f" Mode: {self.workflow.workflowMode}")
+ print(f" Current Round: {self.workflow.currentRound}")
+
+ async def waitForWorkflowCompletion(self, maxWaitTime: int = 300) -> bool:
+ """Wait for workflow to complete, checking status periodically."""
+ print("\n" + "="*60)
+ print("WAITING FOR WORKFLOW COMPLETION")
+ print("="*60)
+
+ if not self.workflow:
+ print("β No workflow to wait for")
+ return False
+
+ startTime = time.time()
+ checkInterval = 2 # Check every 2 seconds
+ lastStatus = None
+
+ while time.time() - startTime < maxWaitTime:
+ # Get current workflow status
+ interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
+ currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id)
+
+ if not currentWorkflow:
+ print("β Workflow not found in database")
+ return False
+
+ currentStatus = currentWorkflow.status
+
+ # Print status if it changed
+ if currentStatus != lastStatus:
+ print(f"Workflow status: {currentStatus} (elapsed: {int(time.time() - startTime)}s)")
+ lastStatus = currentStatus
+
+ # Check if workflow is complete
+ if currentStatus in ["completed", "stopped", "failed"]:
+ self.workflow = currentWorkflow
+ print(f"\nβ
Workflow finished with status: {currentStatus}")
+ return currentStatus == "completed"
+
+ # Wait before next check
+ await asyncio.sleep(checkInterval)
+
+ print(f"\nβ οΈ Workflow did not complete within {maxWaitTime} seconds")
+ print(f" Final status: {self.workflow.status}")
+ return False
+
+ def analyzeWorkflowResults(self) -> Dict[str, Any]:
+ """Analyze workflow results and extract information."""
+ print("\n" + "="*60)
+ print("ANALYZING WORKFLOW RESULTS")
+ print("="*60)
+
+ if not self.workflow:
+ return {"error": "No workflow to analyze"}
+
+ interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
+ workflow = interfaceDbChat.getWorkflow(self.workflow.id)
+
+ if not workflow:
+ return {"error": "Workflow not found"}
+
+ # Get unified chat data
+ chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
+
+ # Count messages
+ messages = chatData.get("messages", [])
+ userMessages = [m for m in messages if m.get("role") == "user"]
+ assistantMessages = [m for m in messages if m.get("role") == "assistant"]
+
+ # Count documents
+ documents = chatData.get("documents", [])
+
+ # Get logs
+ logs = chatData.get("logs", [])
+
+ # Get stats
+ stats = chatData.get("stats", [])
+
+ results = {
+ "workflowId": workflow.id,
+ "status": workflow.status,
+ "workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None,
+ "currentRound": workflow.currentRound,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions,
+ "messageCount": len(messages),
+ "userMessageCount": len(userMessages),
+ "assistantMessageCount": len(assistantMessages),
+ "documentCount": len(documents),
+ "logCount": len(logs),
+ "statCount": len(stats),
+ "messages": messages,
+ "documents": documents,
+ "logs": logs,
+ "stats": stats
+ }
+
+ print(f"Workflow ID: {results['workflowId']}")
+ print(f"Status: {results['status']}")
+ print(f"Mode: {results['workflowMode']}")
+ print(f"Round: {results['currentRound']}")
+ print(f"Tasks: {results['totalTasks']}")
+ print(f"Actions: {results['totalActions']}")
+ print(f"Messages: {results['messageCount']} (User: {results['userMessageCount']}, Assistant: {results['assistantMessageCount']})")
+ print(f"Documents: {results['documentCount']}")
+ print(f"Logs: {results['logCount']}")
+ print(f"Stats: {results['statCount']}")
+
+ # Print first user message
+ if userMessages:
+ print(f"\nFirst user message:")
+ print(f" {userMessages[0].get('message', '')[:200]}...")
+
+ # Print last assistant message
+ if assistantMessages:
+ print(f"\nLast assistant message:")
+ lastMsg = assistantMessages[-1]
+ print(f" {lastMsg.get('message', '')[:200]}...")
+ if lastMsg.get('documents'):
+ print(f" Documents attached: {len(lastMsg['documents'])}")
+
+ # Print document names
+ if documents:
+ print(f"\nGenerated documents:")
+ for doc in documents:
+ print(f" - {doc.get('fileName', 'unknown')} ({doc.get('fileSize', 0)} bytes)")
+
+ return results
+
+ async def runTest(self):
+ """Run the complete test."""
+ print("\n" + "="*80)
+ print("WORKFLOW TEST WITH DOCUMENTS")
+ print("="*80)
+
+ try:
+ # Initialize
+ await self.initialize()
+
+ # Upload files
+ fileIds = await self.uploadFiles()
+
+ # Start workflow with prompt and files
+ prompt = "Generiere die ersten 5000 Primzahlen in einer Tabelle mit 10 Spalten pro Zeile."
+ await self.startWorkflow(prompt, fileIds)
+
+ # Wait for completion
+ completed = await self.waitForWorkflowCompletion(maxWaitTime=300)
+
+ # Analyze results
+ results = self.analyzeWorkflowResults()
+
+ self.testResults = {
+ "completed": completed,
+ "results": results
+ }
+
+ print("\n" + "="*80)
+ print("TEST SUMMARY")
+ print("="*80)
+ print(f"Workflow completed: {'β
' if completed else 'β'}")
+ print(f"Status: {results.get('status', 'unknown')}")
+ print(f"Messages: {results.get('messageCount', 0)}")
+ print(f"Documents: {results.get('documentCount', 0)}")
+
+ return self.testResults
+
+ except Exception as e:
+ import traceback
+ print(f"\nβ Test failed with error: {type(e).__name__}: {str(e)}")
+ print(f"Traceback:\n{traceback.format_exc()}")
+ self.testResults = {
+ "completed": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }
+ return self.testResults
+
+
+async def main():
+ """Run workflow test with documents."""
+ tester = WorkflowWithDocumentsTester()
+ results = await tester.runTest()
+
+ # Print final results as JSON for easy parsing
+ print("\n" + "="*80)
+ print("FINAL RESULTS (JSON)")
+ print("="*80)
+ print(json.dumps(results, indent=2, default=str))
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
+