diff --git a/env_dev.env b/env_dev.env index da72e528..95b2b91e 100644 --- a/env_dev.env +++ b/env_dev.env @@ -73,7 +73,7 @@ Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpEbm0yRUJ6VUJK # Debug Configuration APP_DEBUG_CHAT_WORKFLOW_ENABLED = True -APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat +APP_DEBUG_CHAT_WORKFLOW_DIR = D:/Athi/Local/Web/poweron/local/debug # Manadate Pre-Processing Servers PREPROCESS_ALTHAUS_CHAT_SECRET = (empty) \ No newline at end of file diff --git a/env_prod.env b/env_prod.env index c7699b03..0daaff02 100644 --- a/env_prod.env +++ b/env_prod.env @@ -73,7 +73,7 @@ Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4d3Z4d2x6N1F # Debug Configuration APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE -APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat# Development Environment Configuration +APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat # Manadate Pre-Processing Servers PREPROCESS_ALTHAUS_CHAT_SECRET = kj823u90209mj020394jp2msakhfkjashjkf \ No newline at end of file diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py index c26bdaf2..422056d0 100644 --- a/modules/aicore/aicorePluginAnthropic.py +++ b/modules/aicore/aicorePluginAnthropic.py @@ -26,8 +26,10 @@ class AiAnthropic(BaseConnectorAi): self.apiKey = self.config["apiKey"] # HttpClient for API calls + # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer + # Document generation and complex AI operations can take significantly longer self.httpClient = httpx.AsyncClient( - timeout=120.0, # Longer timeout for complex requests + timeout=600.0, headers={ "x-api-key": self.apiKey, "anthropic-version": "2023-06-01", # Anthropic API Version diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py index 86e06898..2a6f0890 100644 --- a/modules/aicore/aicorePluginPerplexity.py +++ b/modules/aicore/aicorePluginPerplexity.py @@ -27,7 +27,7 @@ class AiPerplexity(BaseConnectorAi): # HttpClient for API calls self.httpClient = httpx.AsyncClient( - timeout=120.0, # Longer timeout for complex requests + timeout=600.0, # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer headers={ "Authorization": f"Bearer {self.apiKey}", "Content-Type": "application/json", diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 055a8b9e..ada6cd22 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -33,7 +33,7 @@ class AiService: """ self.services = serviceCenter # Only depend on interfaces - self.aiObjects = None # Will be initialized in create() or _ensureAiObjectsInitialized() + self.aiObjects = None # Will be initialized in create() or ensureAiObjectsInitialized() # Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready self.extractionService = None @@ -46,7 +46,7 @@ class AiService: logger.info("Initializing ExtractionService...") self.extractionService = ExtractionService(self.services) - async def _ensureAiObjectsInitialized(self): + async def ensureAiObjectsInitialized(self): """Ensure aiObjects is initialized and submodules are ready.""" if self.aiObjects is None: logger.info("Lazy initializing AiObjects...") @@ -296,8 +296,10 @@ Respond with ONLY a JSON object in this exact format: logger.warning(f"Iteration {iteration}: No sections extracted, stopping") break - # Add new sections to accumulator - allSections.extend(extractedSections) + # Merge new sections with existing sections intelligently + # This handles the STANDARD CASE: broken JSON iterations must be merged together + # The break can occur anywhere - in any section, at any depth + allSections = self._mergeSectionsIntelligently(allSections, extractedSections, iteration) # Check if we should continue (completion detection) if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result): @@ -331,6 +333,317 @@ Respond with ONLY a JSON object in this exact format: return final_result + def _mergeSectionsIntelligently( + self, + existingSections: List[Dict[str, Any]], + newSections: List[Dict[str, Any]], + iteration: int + ) -> List[Dict[str, Any]]: + """ + Intelligently merge sections from multiple iterations. + + This is a GENERIC merging strategy that handles broken JSON iterations. + The break can occur anywhere - in any section, at any depth. + + Merging strategies (in order of priority): + 1. Same Section ID: Merge sections with identical IDs + 2. Same Content-Type + Position: If last section is incomplete and new section continues it + 3. Same Order: Merge sections with same order value + 4. Structural Analysis: Detect continuation based on content structure + + Args: + existingSections: Sections accumulated from previous iterations + newSections: Sections extracted from current iteration + iteration: Current iteration number + + Returns: + Merged list of sections + """ + if not newSections: + return existingSections + + if not existingSections: + return newSections + + mergedSections = existingSections.copy() + + for newSection in newSections: + merged = False + + # Strategy 1: Same Section ID - merge directly + newSectionId = newSection.get("id") + if newSectionId: + for i, existingSection in enumerate(mergedSections): + if existingSection.get("id") == newSectionId: + # Merge sections with same ID + mergedSections[i] = self._mergeSectionContent(existingSection, newSection, iteration) + merged = True + logger.debug(f"Iteration {iteration}: Merged section by ID '{newSectionId}'") + break + + if merged: + continue + + # Strategy 2: Same Content-Type + Position (continuation detection) + # Check if last section is incomplete and new section continues it + if mergedSections: + lastSection = mergedSections[-1] + lastContentType = lastSection.get("content_type") + newContentType = newSection.get("content_type") + + if lastContentType == newContentType: + # Same content type - check if last section is incomplete + if self._isSectionIncomplete(lastSection): + # Last section is incomplete, merge with new section + mergedSections[-1] = self._mergeSectionContent(lastSection, newSection, iteration) + merged = True + logger.debug(f"Iteration {iteration}: Merged section by content-type continuation ({lastContentType})") + continue + + # Strategy 3: Same Order value + newOrder = newSection.get("order") + if newOrder is not None: + for i, existingSection in enumerate(mergedSections): + existingOrder = existingSection.get("order") + if existingOrder is not None and existingOrder == newOrder: + # Merge sections with same order + mergedSections[i] = self._mergeSectionContent(existingSection, newSection, iteration) + merged = True + logger.debug(f"Iteration {iteration}: Merged section by order {newOrder}") + break + + if merged: + continue + + # Strategy 4: Structural Analysis - detect continuation + # For code_block: if last section is code_block and new section is also code_block, merge + if mergedSections: + lastSection = mergedSections[-1] + if (lastSection.get("content_type") == "code_block" and + newSection.get("content_type") == "code_block"): + # Both are code blocks - merge them + mergedSections[-1] = self._mergeSectionContent(lastSection, newSection, iteration) + merged = True + logger.debug(f"Iteration {iteration}: Merged code_block sections by structural analysis") + continue + + # No merge strategy matched - add as new section + if not merged: + mergedSections.append(newSection) + logger.debug(f"Iteration {iteration}: Added new section '{newSection.get('id', 'no-id')}' ({newSection.get('content_type', 'unknown')})") + + return mergedSections + + def _isSectionIncomplete(self, section: Dict[str, Any]) -> bool: + """ + Check if a section is incomplete (broken at the end). + + This detects incomplete sections based on content analysis: + - Code blocks: ends mid-line, ends with comma, ends with incomplete structure + - Text sections: ends mid-sentence, ends with incomplete structure + - Other types: check for incomplete elements + """ + contentType = section.get("content_type", "") + elements = section.get("elements", []) + + if not elements: + return False + + # Handle list of elements + if isinstance(elements, list) and len(elements) > 0: + lastElement = elements[-1] + else: + lastElement = elements + + if not isinstance(lastElement, dict): + return False + + # Check code_block for incomplete code + if contentType == "code_block": + code = lastElement.get("code", "") + if code: + # Check if code ends incompletely: + # - Ends with comma (incomplete CSV line) + # - Ends with number but no newline (incomplete line) + # - Ends mid-token (e.g., "23431,23" - incomplete number) + codeStripped = code.rstrip() + if codeStripped: + # Check for incomplete patterns + if codeStripped.endswith(',') or (',' in codeStripped and not codeStripped.endswith('\n')): + # Ends with comma or has comma but no final newline - likely incomplete + return True + # Check if last line is incomplete (doesn't end with newline and has partial content) + if not code.endswith('\n') and codeStripped: + # No final newline - might be incomplete + # More sophisticated: check if last number is complete + lastLine = codeStripped.split('\n')[-1] + if lastLine and ',' in lastLine: + # Has commas but might be incomplete + parts = lastLine.split(',') + if parts and len(parts[-1]) < 5: # Last part is very short - might be incomplete + return True + + # Check paragraph/text for incomplete sentences + if contentType in ["paragraph", "heading"]: + text = lastElement.get("text", "") + if text: + # Simple heuristic: if doesn't end with sentence-ending punctuation + textStripped = text.rstrip() + if textStripped and not textStripped[-1] in '.!?': + # Might be incomplete, but this is less reliable + # Only mark as incomplete if very short (likely cut off) + if len(textStripped) < 20: + return True + + return False + + def _mergeSectionContent( + self, + existingSection: Dict[str, Any], + newSection: Dict[str, Any], + iteration: int + ) -> Dict[str, Any]: + """ + Merge content from two sections. + + Handles different content types: + - code_block: Append code, handle overlaps, merge incomplete lines + - paragraph/heading: Append text + - table: Merge rows + - list: Merge items + - Other: Merge elements + """ + contentType = existingSection.get("content_type", "") + existingElements = existingSection.get("elements", []) + newElements = newSection.get("elements", []) + + if not newElements: + return existingSection + + # Handle list of elements + if isinstance(existingElements, list): + existingElem = existingElements[-1] if existingElements else {} + else: + existingElem = existingElements + + if isinstance(newElements, list): + newElem = newElements[0] if newElements else {} + else: + newElem = newElements + + if not isinstance(existingElem, dict) or not isinstance(newElem, dict): + return existingSection + + # Merge based on content type + if contentType == "code_block": + existingCode = existingElem.get("code", "") + newCode = newElem.get("code", "") + + if existingCode and newCode: + mergedCode = self._mergeCodeBlocks(existingCode, newCode, iteration) + existingElem["code"] = mergedCode + # Preserve language from existing or new + if "language" not in existingElem and "language" in newElem: + existingElem["language"] = newElem["language"] + + elif contentType in ["paragraph", "heading"]: + existingText = existingElem.get("text", "") + newText = newElem.get("text", "") + + if existingText and newText: + # Append text with space if needed + if existingText.rstrip() and not existingText.rstrip()[-1] in '.!?\n': + mergedText = existingText.rstrip() + " " + newText.lstrip() + else: + mergedText = existingText.rstrip() + "\n" + newText.lstrip() + existingElem["text"] = mergedText + + elif contentType == "table": + # Merge table rows + existingRows = existingElem.get("rows", []) + newRows = newElem.get("rows", []) + if existingRows and newRows: + existingElem["rows"] = existingRows + newRows + + elif contentType in ["bullet_list", "numbered_list"]: + # Merge list items + existingItems = existingElem.get("items", []) + newItems = newElem.get("items", []) + if existingItems and newItems: + existingElem["items"] = existingItems + newItems + + # Update section with merged content + mergedSection = existingSection.copy() + if isinstance(existingElements, list): + mergedSection["elements"] = existingElements + else: + mergedSection["elements"] = existingElem + + # Preserve metadata from new section if missing in existing + if "order" not in mergedSection and "order" in newSection: + mergedSection["order"] = newSection["order"] + + return mergedSection + + def _mergeCodeBlocks(self, existingCode: str, newCode: str, iteration: int) -> str: + """ + Merge two code blocks intelligently, handling overlaps and incomplete lines. + """ + if not existingCode: + return newCode + if not newCode: + return existingCode + + existingLines = existingCode.rstrip().split('\n') + newLines = newCode.strip().split('\n') + + if not existingLines or not newLines: + return existingCode + "\n" + newCode + + lastExistingLine = existingLines[-1].strip() + firstNewLine = newLines[0].strip() + + # Strategy 1: Exact overlap - remove duplicate line + if lastExistingLine == firstNewLine: + newLines = newLines[1:] + logger.debug(f"Iteration {iteration}: Removed exact duplicate line in code merge") + + # Strategy 2: Incomplete line merge + # If last existing line ends with comma or is incomplete, merge with first new line + elif lastExistingLine.endswith(',') or (',' in lastExistingLine and len(lastExistingLine.split(',')[-1]) < 5): + # Last line is incomplete - merge with first new line + # Remove trailing comma from existing line + mergedLine = lastExistingLine.rstrip(',') + ',' + firstNewLine.lstrip() + existingLines[-1] = mergedLine + newLines = newLines[1:] + logger.debug(f"Iteration {iteration}: Merged incomplete line with continuation") + + # Strategy 3: Partial overlap detection + # Check if first new line starts with the end of last existing line + elif ',' in lastExistingLine and ',' in firstNewLine: + lastExistingParts = lastExistingLine.split(',') + firstNewParts = firstNewLine.split(',') + + # Check for overlap: if last part of existing matches first part of new + if lastExistingParts and firstNewParts: + lastExistingPart = lastExistingParts[-1].strip() + firstNewPart = firstNewParts[0].strip() + + # If they match, there's overlap + if lastExistingPart == firstNewPart and len(lastExistingParts) > 1: + # Remove overlapping part from new line + newLines[0] = ','.join(firstNewParts[1:]) + logger.debug(f"Iteration {iteration}: Removed partial overlap in code merge") + + # Reconstruct merged code + mergedCode = '\n'.join(existingLines) + if newLines: + if mergedCode and not mergedCode.endswith('\n'): + mergedCode += '\n' + mergedCode += '\n'.join(newLines) + + return mergedCode + def _extractSectionsFromResponse( self, result: str, @@ -513,7 +826,7 @@ Respond with ONLY a JSON object in this exact format: Returns: Planning JSON response """ - await self._ensureAiObjectsInitialized() + await self.ensureAiObjectsInitialized() # Planning calls always use static parameters options = AiCallOptions( @@ -569,7 +882,7 @@ Respond with ONLY a JSON object in this exact format: Returns: AiResponse with content, metadata, and optional documents """ - await self._ensureAiObjectsInitialized() + await self.ensureAiObjectsInitialized() # Create separate operationId for detailed progress tracking workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" diff --git a/modules/services/serviceExtraction/subPromptBuilderExtraction.py b/modules/services/serviceExtraction/subPromptBuilderExtraction.py index a796ea3b..f6329a5c 100644 --- a/modules/services/serviceExtraction/subPromptBuilderExtraction.py +++ b/modules/services/serviceExtraction/subPromptBuilderExtraction.py @@ -156,24 +156,8 @@ Extract the ACTUAL CONTENT from the source documents. Do not use placeholder tex pass # Save extraction prompt to debug file - only if debug enabled - if services: - try: - debug_enabled = services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - from modules.shared.configuration import APP_CONFIG - logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - debug_root = os.path.join(logDir, 'debug') - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_prompt.txt"), "w", encoding="utf-8") as f: - f.write(adaptive_prompt) - except Exception: - pass + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(adaptive_prompt, "extraction_prompt") return adaptive_prompt diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py index 08fccc63..c68546bf 100644 --- a/modules/shared/debugLogger.py +++ b/modules/shared/debugLogger.py @@ -25,14 +25,28 @@ def _isDebugEnabled() -> bool: """Check if debug workflow logging is enabled.""" return APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) -def _getDebugDir() -> str: - """Get the debug directory path from configuration.""" - # Get log directory from config (same as used by main logging system) +def _getBaseDebugDir() -> str: + """Get the base debug directory path from configuration.""" + # Check if custom debug directory is configured + customDebugDir = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_DIR", None) + if customDebugDir: + # Use custom debug directory if configured + if not os.path.isabs(customDebugDir): + # If relative path, make it relative to the gateway directory + gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + customDebugDir = os.path.join(gatewayDir, customDebugDir) + return customDebugDir + + # Default: Get log directory from config (same as used by main logging system) logDir = _resolveLogDir() # Create debug subdirectory within the log directory - debugDir = os.path.join(logDir, 'debug/prompts') - return debugDir + return os.path.join(logDir, 'debug') + +def _getDebugDir() -> str: + """Get the debug prompts directory path from configuration.""" + baseDebugDir = _getBaseDebugDir() + return os.path.join(baseDebugDir, 'prompts') def _getNextSequenceNumber() -> int: """Get the next sequence number by counting existing files.""" @@ -50,7 +64,7 @@ def writeDebugFile(content: str, fileType: str, documents: Optional[List] = None Write debug content to a file with sequential numbering. Writes the content as-is since it's already the final integrated prompt. Includes document list labels for tracing enhancement. - Only writes if debug logging is enabled via APP_DEBUG_CHAT_WORKFLOW_ENABLED config. + Only writes if debug logging is enabled via _isDebugEnabled() function. Args: content: The main content to write (already integrated) @@ -111,9 +125,8 @@ def debugLogToFile(message: str, context: str = "DEBUG") -> None: if not _isDebugEnabled(): return - # Get debug directory - logDir = _resolveLogDir() - debug_dir = os.path.join(logDir, 'debug') + # Get debug directory (use base debug dir, not prompts subdirectory) + debug_dir = _getBaseDebugDir() _ensureDir(debug_dir) # Create debug file path @@ -148,9 +161,9 @@ def storeDebugMessageAndDocuments(message, currentUser) -> None: import json from datetime import datetime, UTC - # Create base debug directory - logDir = _resolveLogDir() - debug_root = os.path.join(logDir, 'debug', 'messages') + # Create base debug directory (use base debug dir, not prompts subdirectory) + baseDebugDir = _getBaseDebugDir() + debug_root = os.path.join(baseDebugDir, 'messages') _ensureDir(debug_root) # Generate timestamp diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index d9baa462..78f2da3e 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -157,6 +157,11 @@ class TaskPlanner: if 'description' in taskDict and 'objective' not in taskDict: taskDict['objective'] = taskDict.pop('description') + # Ensure objective is always set (required field) + if 'objective' not in taskDict or not taskDict.get('objective'): + logger.warning(f"Task {i+1} missing objective, using fallback") + taskDict['objective'] = actualUserPrompt or 'Task objective not specified' + # Extract format details from workflow intent and populate TaskStep # Use workflow-level intent for format requirements (tasks inherit from workflow) if isinstance(workflowIntent, dict): diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py index 6af8e8e1..49097bf4 100644 --- a/modules/workflows/processing/modes/modeDynamic.py +++ b/modules/workflows/processing/modes/modeDynamic.py @@ -367,27 +367,25 @@ class DynamicMode(BaseMode): debugType="paramplan" ) - # Parse JSON response using structured parsing with ActionDefinition model - from modules.shared.jsonUtils import parseJsonWithModel - from modules.datamodels.datamodelWorkflow import ActionDefinition + # Parse JSON response - Stage 2 only returns parameters structure, not full ActionDefinition + from modules.shared.jsonUtils import tryParseJson - try: - # Parse response string as ActionDefinition (Stage 2 adds parameters) - actionDef = parseJsonWithModel(paramsResp, ActionDefinition) - # Extract parameters from parsed model - parameters = actionDef.parameters if actionDef.parameters else {} - - # Extract userMessage from Stage 2 response if available - # Stage 2 can override Stage 1 userMessage with more specific message - if hasattr(actionDef, 'userMessage') and actionDef.userMessage: - selection['userMessage'] = actionDef.userMessage - except ValueError as e: - logger.error(f"Failed to parse ActionDefinition from parameters response: {e}") + jsonObj, parseError, cleanedStr = tryParseJson(paramsResp) + if parseError or not isinstance(jsonObj, dict): + logger.error(f"Failed to parse JSON from parameters response: {parseError}") logger.error(f"Response was: {paramsResp[:500]}...") - raise ValueError(f"AI parameters response invalid: {e}") + raise ValueError(f"AI parameters response invalid JSON: {parseError}") + # Extract parameters from response (Stage 2 only provides parameters, not full ActionDefinition) + parameters = jsonObj.get('parameters', {}) if not isinstance(parameters, dict): raise ValueError("AI parameters response missing 'parameters' object") + + # Extract userMessage from Stage 2 response if available + # Stage 2 can override Stage 1 userMessage with more specific message + userMessage = jsonObj.get('userMessage') + if userMessage: + selection['userMessage'] = userMessage # Merge Stage 1 resource selections into Stage 2 parameters (only if action expects them) try: @@ -731,15 +729,8 @@ class DynamicMode(BaseMode): ) try: - # Parse response string as ReviewResult + # Parse response string as ReviewResult (prompt now correctly asks for "status") decision = parseJsonWithModel(resp, ReviewResult) - - # Map "stop" decision to "success" status for ReviewResult - if hasattr(decision, 'decision') and decision.decision == 'stop': - decision.status = 'success' - elif not hasattr(decision, 'status') or not decision.status: - decision.status = 'continue' - return decision except ValueError as e: logger.warning(f"Failed to parse ReviewResult from response: {e}. Using default.") diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py index 4f3f6ad8..1f4c5845 100644 --- a/modules/workflows/processing/shared/placeholderFactory.py +++ b/modules/workflows/processing/shared/placeholderFactory.py @@ -10,8 +10,8 @@ NAMING CONVENTION: MAPPING TABLE (keys β†’ function) with usage [taskplan | dynamic]: {{KEY:USER_PROMPT}} -> extractUserPrompt() [taskplan, dynamic] -{{KEY:OVERALL_TASK_CONTEXT}} -> extractOverallTaskContext() [dynamic] -{{KEY:TASK_OBJECTIVE}} -> extractTaskObjective() [dynamic] +{{KEY:OVERALL_TASK_CONTEXT}} -> services.currentUserPromptNormalized (always set in WorkflowManager._sendFirstMessage) [direct] +{{KEY:TASK_OBJECTIVE}} -> context.taskStep.objective (always set in TaskPlanner.generateTaskPlan) [direct] {{KEY:USER_LANGUAGE}} -> extractUserLanguage() [dynamic] {{KEY:LANGUAGE_USER_DETECTED}} -> extractLanguageUserDetected() [taskplan] {{KEY:WORKFLOW_HISTORY}} -> extractWorkflowHistory() [taskplan, dynamic] @@ -38,57 +38,6 @@ from typing import Dict, Any, List logger = logging.getLogger(__name__) from modules.workflows.processing.shared.methodDiscovery import (methods, discoverMethods) -def extractOverallTaskContext(service: Any, context: Any) -> str: - """Extract the original normalized user request (overall task context). Maps to {{KEY:OVERALL_TASK_CONTEXT}}. - Always returns the original user request, not the task objective. - """ - try: - # Always prefer the normalized user prompt from services (original request) - if service: - # Prefer normalized version if available - normalized = getattr(service, 'currentUserPromptNormalized', None) - if normalized: - return normalized - - # Fallback to currentUserPrompt (original request) - currentPrompt = getattr(service, 'currentUserPrompt', None) - if currentPrompt: - return currentPrompt - - # If no services available, try to get from workflow's first message - if hasattr(context, 'workflow') and context.workflow: - messages = getattr(context.workflow, 'messages', []) or [] - if messages: - firstMessage = messages[0] - msgContent = getattr(firstMessage, 'message', None) or '' - if msgContent: - return msgContent - - return 'No overall task context available' - except Exception: - return 'No overall task context available' - -def extractTaskObjective(context: Any) -> str: - """Extract the task objective from taskStep. Maps to {{KEY:TASK_OBJECTIVE}}. - Returns the specific task objective, not the overall user request. - """ - try: - if hasattr(context, 'taskStep') and context.taskStep: - objective = getattr(context.taskStep, 'objective', None) - if objective: - return objective - - # Fallback: try to get from services - services = getattr(context, 'services', None) - if services: - currentPrompt = getattr(services, 'currentUserPrompt', None) - if currentPrompt: - return currentPrompt - - return 'No task objective specified' - except Exception: - return 'No task objective specified' - def extractUserPrompt(context: Any) -> str: """Extract user prompt from context. Maps to {{KEY:USER_PROMPT}}. Prefer the cleaned intent stored on the services object if available via context. @@ -102,7 +51,7 @@ def extractUserPrompt(context: Any) -> str: if services and getattr(services, 'currentUserPrompt', None): rawPrompt = services.currentUserPrompt elif hasattr(context, 'taskStep') and context.taskStep: - rawPrompt = context.taskStep.objective or 'No request specified' + rawPrompt = context.taskStep.objective else: rawPrompt = 'No request specified' @@ -114,7 +63,7 @@ def extractUserPrompt(context: Any) -> str: except Exception: # Robust fallback behavior if hasattr(context, 'taskStep') and context.taskStep: - return context.taskStep.objective or 'No request specified' + return context.taskStep.objective return 'No request specified' def extractWorkflowHistory(service: Any) -> str: diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py index 7985fdb1..1ed82551 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py +++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py @@ -17,16 +17,14 @@ from modules.workflows.processing.shared.placeholderFactory import ( extractLearningsAndImprovements, extractLatestRefinementFeedback, extractWorkflowHistory, - extractOverallTaskContext, - extractTaskObjective, ) from modules.workflows.processing.shared.methodDiscovery import methods, getActionParameterList def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=None) -> PromptBundle: """Define placeholders first, then the template; return PromptBundle.""" placeholders: List[PromptPlaceholder] = [ - PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=extractOverallTaskContext(services, context), summaryAllowed=False), - PromptPlaceholder(label="TASK_OBJECTIVE", content=extractTaskObjective(context), summaryAllowed=False), + PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=services.currentUserPromptNormalized, summaryAllowed=False), + PromptPlaceholder(label="TASK_OBJECTIVE", content=context.taskStep.objective, summaryAllowed=False), PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False), PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False), PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True), @@ -197,7 +195,7 @@ Excludes documents/connections/history entirely. learningsText = "" placeholders: List[PromptPlaceholder] = [ - PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=extractOverallTaskContext(services, context), summaryAllowed=False), + PromptPlaceholder(label="OVERALL_TASK_CONTEXT", content=services.currentUserPromptNormalized, summaryAllowed=False), PromptPlaceholder(label="ACTION_OBJECTIVE", content=actionObjective, summaryAllowed=False), PromptPlaceholder(label="SELECTED_ACTION", content=compoundActionName, summaryAllowed=False), PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False), @@ -296,12 +294,12 @@ OBJECTIVE: '{{KEY:USER_PROMPT}}' DECISION RULES: 1. "continue" = objective NOT fulfilled -2. "stop" = objective fulfilled +2. "success" = objective fulfilled 3. Return ONLY JSON - no other text OUTPUT FORMAT (only JSON object to deliver): {{ - "decision": "continue", + "status": "continue", "reason": "Brief reason for decision" }} diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 34fa8c12..b6b0a386 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -324,7 +324,7 @@ class WorkflowProcessor: """ try: # Ensure AI service is initialized - await self.services.ai._ensureAiObjectsInitialized() + await self.services.ai.ensureAiObjectsInitialized() # Build complexity detection prompt (language-agnostic, semantic) complexityPrompt = ( @@ -368,8 +368,8 @@ class WorkflowProcessor: # Parse response complexity = "moderate" # Default fallback try: - # Extract response content (AiResponse.content is a string) - responseContent = aiResponse.content if isinstance(aiResponse, str) else (aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse)) + # callAiPlanning returns a string directly, not an object + responseContent = str(aiResponse) if aiResponse else "" # Extract JSON from response jsonStr = extractJsonString(responseContent) @@ -409,7 +409,7 @@ class WorkflowProcessor: """ try: # Ensure AI service is initialized - await self.services.ai._ensureAiObjectsInitialized() + await self.services.ai.ensureAiObjectsInitialized() # Build fast path prompt (understand + execute + deliver in one call) fastPathPrompt = ( @@ -496,7 +496,7 @@ class WorkflowProcessor: from modules.shared.jsonUtils import parseJsonWithModel # Ensure AI service is initialized - await self.services.ai._ensureAiObjectsInitialized() + await self.services.ai.ensureAiObjectsInitialized() # Build combined understanding prompt understandingPrompt = ( diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 85a6e32f..e5ffd2c6 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -161,23 +161,23 @@ class WorkflowManager: self.services.chat._progressLogger = None self.workflowProcessor = WorkflowProcessor(self.services) - await self._sendFirstMessage(userInput) - # Fast Path Detection and Routing - # Get documents from first message if available - workflow = self.services.workflow + # Process user-uploaded documents from userInput for complexity detection + # This is the correct way: use the input data directly, not workflow state documents = [] - if workflow.messages and len(workflow.messages) > 0: - # Get documents from the first message - firstMessageId = workflow.messages[0] if isinstance(workflow.messages[0], str) else workflow.messages[0].id - firstMessage = self.services.chat.getMessage(firstMessageId) - if firstMessage and hasattr(firstMessage, 'documents'): - documents = firstMessage.documents + if userInput.listFileId: + try: + documents = await self._processFileIds(userInput.listFileId, None) + except Exception as e: + logger.warning(f"Failed to process user fileIds for complexity detection: {e}") - # Detect complexity (AI-based semantic understanding) + # Detect complexity (AI-based semantic understanding) using user input documents complexity = await self.workflowProcessor.detectComplexity(userInput.prompt, documents) logger.info(f"Request complexity detected: {complexity}") + # Now send the first message (which will also process the documents again, but that's fine) + await self._sendFirstMessage(userInput) + # Route to fast path for simple requests if complexity == "simple": logger.info("Routing to fast path for simple request") @@ -342,6 +342,8 @@ class WorkflowManager: logger.info("Skipping user intention analysis for AUTOMATION mode - using direct user input") # For automation mode, use user input directly without AI analysis self.services.currentUserPrompt = userInput.prompt + # Always set currentUserPromptNormalized - use user input directly for automation mode + self.services.currentUserPromptNormalized = userInput.prompt detectedLanguage = None normalizedRequest = None intentText = userInput.prompt @@ -409,13 +411,11 @@ class WorkflowManager: except Exception: pass self.services.currentUserPrompt = intentText or userInput.prompt - try: - if normalizedRequest: - setattr(self.services, 'currentUserPromptNormalized', normalizedRequest) - if contextItems is not None: - setattr(self.services, 'currentUserContextItems', contextItems) - except Exception: - pass + # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt + normalizedValue = normalizedRequest or intentText or userInput.prompt + self.services.currentUserPromptNormalized = normalizedValue + if contextItems is not None: + self.services.currentUserContextItems = contextItems # Create documents for context items if contextItems and isinstance(contextItems, list): diff --git a/tests/functional/test05_openai_timeout.py b/tests/functional/test05_openai_timeout.py deleted file mode 100644 index 68695c28..00000000 --- a/tests/functional/test05_openai_timeout.py +++ /dev/null @@ -1,367 +0,0 @@ -#!/usr/bin/env python3 -""" -OpenAI Timeout Analysis Test - Tests OpenAI API calls to identify timeout issues -Compares different scenarios to understand why OpenAI calls fail in functional tests but work in module tests. -""" - -import asyncio -import json -import sys -import os -import time -from typing import Dict, Any, List - -# Add the gateway to path (go up 2 levels from tests/functional/) -_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) -if _gateway_path not in sys.path: - sys.path.insert(0, _gateway_path) - -from modules.services import getInterface as getServices -from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum -from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum -import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects - - -class OpenAITimeoutTester: - """Test OpenAI API calls to identify timeout issues.""" - - def __init__(self): - # Use root user for testing (has full access to everything) - from modules.interfaces.interfaceDbAppObjects import getRootInterface - rootInterface = getRootInterface() - self.testUser = rootInterface.currentUser - - # Initialize services - self.services = getServices(self.testUser, None) - self.testResults = [] - - async def initialize(self): - """Initialize workflow and services.""" - import logging - import uuid - import time as time_module - - # Set logging level to DEBUG to see detailed logs - logging.getLogger().setLevel(logging.DEBUG) - - # Create and save workflow in database - currentTimestamp = time_module.time() - - testWorkflow = ChatWorkflow( - id=str(uuid.uuid4()), - name="OpenAI Timeout Test Workflow", - status="running", - startedAt=currentTimestamp, - lastActivity=currentTimestamp, - currentRound=1, - currentTask=0, - currentAction=0, - totalTasks=0, - totalActions=0, - mandateId=self.testUser.mandateId, - messageIds=[], - workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, - maxSteps=5 - ) - - # Save workflow to database - interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) - workflowDict = testWorkflow.model_dump() - interfaceDbChat.createWorkflow(workflowDict) - - # Set the workflow in services - self.services.workflow = testWorkflow - - print("βœ… Services initialized") - print(f"πŸ“‹ Workflow ID: {testWorkflow.id}") - - async def testDirectOpenAICall(self, prompt: str, description: str) -> Dict[str, Any]: - """Test direct OpenAI API call through the connector.""" - print(f"\n{'='*80}") - print(f"TEST: {description}") - print(f"{'='*80}") - print(f"Prompt: {prompt[:100]}...") - - startTime = time.time() - result = { - "description": description, - "prompt": prompt, - "success": False, - "error": None, - "responseTime": 0, - "responseLength": 0, - "method": "direct_connector" - } - - try: - # Get OpenAI connector directly - from modules.aicore.aicorePluginOpenai import AiOpenai - from modules.datamodels.datamodelAi import AiModelCall, AiModel - - # Initialize connector - connector = AiOpenai() - - # Get the gpt-4o model - models = connector.getModels() - gpt4oModel = None - for model in models: - if model.name == "gpt-4o": - gpt4oModel = model - break - - if not gpt4oModel: - result["error"] = "gpt-4o model not found" - return result - - # Create model call - modelCall = AiModelCall( - model=gpt4oModel, - messages=[{"role": "user", "content": prompt}], - options=AiCallOptions() - ) - - # Make the call - print(f"⏱️ Starting API call...") - response = await connector.callAiBasic(modelCall) - - endTime = time.time() - responseTime = endTime - startTime - - result["success"] = True - result["responseTime"] = responseTime - result["responseLength"] = len(response.content) if response.content else 0 - - print(f"βœ… Success! Response time: {responseTime:.2f}s") - print(f"πŸ“Š Response length: {result['responseLength']} characters") - print(f"πŸ“ Response preview: {response.content[:200] if response.content else 'None'}...") - - except Exception as e: - endTime = time.time() - responseTime = endTime - startTime - - result["error"] = str(e) - result["responseTime"] = responseTime - - print(f"❌ Failed after {responseTime:.2f}s") - print(f"πŸ’₯ Error: {type(e).__name__}: {str(e)}") - import traceback - print(f"πŸ“‹ Traceback:\n{traceback.format_exc()}") - - self.testResults.append(result) - return result - - async def testThroughAiService(self, prompt: str, description: str) -> Dict[str, Any]: - """Test OpenAI call through AiService.callAiContent.""" - print(f"\n{'='*80}") - print(f"TEST: {description}") - print(f"{'='*80}") - print(f"Prompt: {prompt[:100]}...") - - startTime = time.time() - result = { - "description": description, - "prompt": prompt, - "success": False, - "error": None, - "responseTime": 0, - "responseLength": 0, - "method": "ai_service" - } - - try: - from modules.datamodels.datamodelWorkflow import AiResponse - - options = AiCallOptions( - operationType=OperationTypeEnum.DATA_GENERATE - ) - - print(f"⏱️ Starting AI service call...") - aiResponse: AiResponse = await self.services.ai.callAiContent( - prompt=prompt, - options=options, - outputFormat="json" - ) - - endTime = time.time() - responseTime = endTime - startTime - - result["success"] = True - result["responseTime"] = responseTime - - if isinstance(aiResponse, AiResponse): - content = aiResponse.content if aiResponse.content else "" - result["responseLength"] = len(content) - print(f"βœ… Success! Response time: {responseTime:.2f}s") - print(f"πŸ“Š Response length: {result['responseLength']} characters") - print(f"πŸ“ Response preview: {content[:200] if content else 'None'}...") - else: - result["responseLength"] = len(str(aiResponse)) - print(f"βœ… Success! Response time: {responseTime:.2f}s") - print(f"πŸ“Š Response length: {result['responseLength']} characters") - - except Exception as e: - endTime = time.time() - responseTime = endTime - startTime - - result["error"] = str(e) - result["responseTime"] = responseTime - - print(f"❌ Failed after {responseTime:.2f}s") - print(f"πŸ’₯ Error: {type(e).__name__}: {str(e)}") - import traceback - print(f"πŸ“‹ Traceback:\n{traceback.format_exc()}") - - self.testResults.append(result) - return result - - async def testTimeoutConfiguration(self) -> Dict[str, Any]: - """Test timeout configuration of OpenAI connector.""" - print(f"\n{'='*80}") - print("TEST: Timeout Configuration Analysis") - print(f"{'='*80}") - - result = { - "description": "Timeout Configuration Analysis", - "timeout": None, - "httpClientTimeout": None, - "connectorType": None - } - - try: - from modules.aicore.aicorePluginOpenai import AiOpenai - - connector = AiOpenai() - result["connectorType"] = connector.getConnectorType() - - # Check httpClient timeout - if hasattr(connector, 'httpClient'): - httpClient = connector.httpClient - if hasattr(httpClient, 'timeout'): - result["httpClientTimeout"] = str(httpClient.timeout) - print(f"πŸ“‹ HttpClient timeout: {httpClient.timeout}") - else: - print(f"⚠️ HttpClient has no timeout attribute") - - # Check for timeout in config - from modules.shared.configuration import APP_CONFIG - openaiTimeout = APP_CONFIG.get('Connector_AiOpenai_TIMEOUT', None) - if openaiTimeout: - result["timeout"] = openaiTimeout - print(f"πŸ“‹ Config timeout: {openaiTimeout}") - else: - print(f"πŸ“‹ No timeout in config (using default)") - - print(f"βœ… Timeout analysis complete") - - except Exception as e: - result["error"] = str(e) - print(f"❌ Error analyzing timeout: {str(e)}") - import traceback - print(f"πŸ“‹ Traceback:\n{traceback.format_exc()}") - - self.testResults.append(result) - return result - - def printSummary(self): - """Print test summary.""" - print(f"\n{'='*80}") - print("OPENAI TIMEOUT TEST SUMMARY") - print(f"{'='*80}") - - for i, result in enumerate(self.testResults, 1): - print(f"\n[{i}] {result.get('description', 'Unknown')}") - print(f" Method: {result.get('method', 'N/A')}") - print(f" Success: {'βœ…' if result.get('success') else '❌'}") - if result.get('responseTime'): - print(f" Response Time: {result['responseTime']:.2f}s") - if result.get('responseLength'): - print(f" Response Length: {result['responseLength']} characters") - if result.get('error'): - print(f" Error: {result['error'][:200]}...") - if result.get('timeout'): - print(f" Timeout Config: {result['timeout']}") - if result.get('httpClientTimeout'): - print(f" HttpClient Timeout: {result['httpClientTimeout']}") - - # Analyze failures - failures = [r for r in self.testResults if not r.get('success')] - if failures: - print(f"\n{'='*80}") - print(f"FAILURES: {len(failures)}/{len(self.testResults)}") - print(f"{'='*80}") - for failure in failures: - print(f"\n❌ {failure.get('description')}") - print(f" Method: {failure.get('method')}") - print(f" Error: {failure.get('error', 'Unknown')[:200]}...") - - -# Test scenarios -TEST_SCENARIOS = [ - { - "description": "Simple prompt (should work)", - "prompt": "Say hello in one sentence." - }, - { - "description": "Medium complexity prompt", - "prompt": "Generate a list of the first 100 prime numbers." - }, - { - "description": "Complex prompt (5000 primes - known to timeout)", - "prompt": "Generate the first 5000 prime numbers in a table with 10 columns per row." - }, - { - "description": "Very simple JSON generation", - "prompt": "Generate a JSON object with one field 'message' containing 'Hello World'." - } -] - - -async def main(): - """Run OpenAI timeout analysis tests.""" - tester = OpenAITimeoutTester() - - print("="*80) - print("OPENAI TIMEOUT ANALYSIS TEST") - print("="*80) - print("\nThis test analyzes why OpenAI calls timeout in functional tests.") - print("It compares direct connector calls vs AiService calls.\n") - - await tester.initialize() - - # Test timeout configuration first - await tester.testTimeoutConfiguration() - - # Test each scenario with both methods - for scenario in TEST_SCENARIOS: - prompt = scenario["prompt"] - description = scenario["description"] - - # Test 1: Direct connector call - await tester.testDirectOpenAICall( - prompt=f"{description} - {prompt}", - description=f"{description} (Direct Connector)" - ) - - # Wait a bit between tests - await asyncio.sleep(2) - - # Test 2: Through AiService - await tester.testThroughAiService( - prompt=f"{description} - {prompt}", - description=f"{description} (AiService)" - ) - - # Wait between scenarios - await asyncio.sleep(3) - - # Print summary - tester.printSummary() - - print(f"\n{'='*80}") - print("TEST COMPLETE") - print(f"{'='*80}") - - -if __name__ == "__main__": - asyncio.run(main()) - diff --git a/tests/functional/test05_workflow_with_documents.py b/tests/functional/test05_workflow_with_documents.py new file mode 100644 index 00000000..af1c9d7b --- /dev/null +++ b/tests/functional/test05_workflow_with_documents.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +Workflow Test with Documents - Tests chat workflow execution with uploaded documents +Simulates the UI route flow: upload files, start workflow with prompt and documents +""" + +import asyncio +import json +import sys +import os +import time +from typing import Dict, Any, List, Optional + +# Add the gateway to path (go up 2 levels from tests/functional/) +_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _gateway_path not in sys.path: + sys.path.insert(0, _gateway_path) + +# Import the service initialization +from modules.services import getInterface as getServices +from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum +from modules.datamodels.datamodelUam import User +from modules.features.chatPlayground.mainChatPlayground import chatStart +import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects + + +class WorkflowWithDocumentsTester: + def __init__(self): + # Use root user for testing (has full access to everything) + from modules.interfaces.interfaceDbAppObjects import getRootInterface + rootInterface = getRootInterface() + self.testUser = rootInterface.currentUser + + # Initialize services using the existing system + self.services = getServices(self.testUser, None) # Test user, no workflow + self.workflow = None + self.testResults = {} + + async def initialize(self): + """Initialize the test environment.""" + # Set logging level to INFO to see workflow progress + import logging + logging.getLogger().setLevel(logging.INFO) + + print(f"Initialized test with user: {self.testUser.id}") + print(f"Mandate ID: {self.testUser.mandateId}") + + def createCsvTemplate(self) -> str: + """Create a CSV template file for prime numbers.""" + csvContent = """Primzahl,Index +2,1 +3,2 +5,3 +7,4 +11,5 +13,6 +17,7 +19,8 +23,9 +29,10 +""" + return csvContent + + def createSecondDocument(self) -> str: + """Create a second text document with instructions.""" + docContent = """Anweisungen zur Primzahlgenerierung: + +1. Generiere die ersten 5000 Primzahlen +2. Formatiere sie in einer Tabelle mit 10 Spalten pro Zeile +3. Verwende das bereitgestellte CSV-Vorlagenformat +4. Stelle sicher, dass alle Zahlen korrekt formatiert sind +5. FΓΌge eine Index-Spalte hinzu, die bei 1 beginnt + + +""" + return docContent + + async def uploadFiles(self) -> List[str]: + """Upload test files to the filesystem and return their file IDs.""" + print("\n" + "="*60) + print("UPLOADING TEST FILES") + print("="*60) + + fileIds = [] + + # Create CSV template file + csvContent = self.createCsvTemplate() + csvFileName = "prime_numbers_template.csv" + + print(f"Creating CSV template: {csvFileName}") + print(f"Content length: {len(csvContent)} bytes") + + # Create file in component storage + csvFileItem = self.services.interfaceDbComponent.createFile( + name=csvFileName, + mimeType="text/csv", + content=csvContent.encode('utf-8') + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(csvFileItem.id, csvContent.encode('utf-8')) + + fileIds.append(csvFileItem.id) + print(f"βœ… Created CSV file with ID: {csvFileItem.id}") + print(f" File name: {csvFileItem.fileName}") + print(f" MIME type: {csvFileItem.mimeType}") + + # Create second text document + docContent = self.createSecondDocument() + docFileName = "prime_numbers_instructions.txt" + + print(f"\nCreating instruction document: {docFileName}") + print(f"Content length: {len(docContent)} bytes") + + # Create file in component storage + docFileItem = self.services.interfaceDbComponent.createFile( + name=docFileName, + mimeType="text/plain", + content=docContent.encode('utf-8') + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(docFileItem.id, docContent.encode('utf-8')) + + fileIds.append(docFileItem.id) + print(f"βœ… Created instruction file with ID: {docFileItem.id}") + print(f" File name: {docFileItem.fileName}") + print(f" MIME type: {docFileItem.mimeType}") + + return fileIds + + async def startWorkflow(self, prompt: str, fileIds: List[str]) -> None: + """Start a chat workflow with prompt and documents.""" + print("\n" + "="*60) + print("STARTING WORKFLOW") + print("="*60) + + print(f"Prompt: {prompt}") + print(f"Number of files: {len(fileIds)}") + print(f"File IDs: {fileIds}") + + # Create UserInputRequest + userInput = UserInputRequest( + prompt=prompt, + listFileId=fileIds, + userLanguage="en" + ) + + # Start workflow (this is async and returns immediately) + print("\nCalling chatStart...") + self.workflow = await chatStart( + currentUser=self.testUser, + userInput=userInput, + workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, + workflowId=None + ) + + print(f"βœ… Workflow started with ID: {self.workflow.id}") + print(f" Status: {self.workflow.status}") + print(f" Mode: {self.workflow.workflowMode}") + print(f" Current Round: {self.workflow.currentRound}") + + async def waitForWorkflowCompletion(self, maxWaitTime: int = 300) -> bool: + """Wait for workflow to complete, checking status periodically.""" + print("\n" + "="*60) + print("WAITING FOR WORKFLOW COMPLETION") + print("="*60) + + if not self.workflow: + print("❌ No workflow to wait for") + return False + + startTime = time.time() + checkInterval = 2 # Check every 2 seconds + lastStatus = None + + while time.time() - startTime < maxWaitTime: + # Get current workflow status + interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) + currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id) + + if not currentWorkflow: + print("❌ Workflow not found in database") + return False + + currentStatus = currentWorkflow.status + + # Print status if it changed + if currentStatus != lastStatus: + print(f"Workflow status: {currentStatus} (elapsed: {int(time.time() - startTime)}s)") + lastStatus = currentStatus + + # Check if workflow is complete + if currentStatus in ["completed", "stopped", "failed"]: + self.workflow = currentWorkflow + print(f"\nβœ… Workflow finished with status: {currentStatus}") + return currentStatus == "completed" + + # Wait before next check + await asyncio.sleep(checkInterval) + + print(f"\n⚠️ Workflow did not complete within {maxWaitTime} seconds") + print(f" Final status: {self.workflow.status}") + return False + + def analyzeWorkflowResults(self) -> Dict[str, Any]: + """Analyze workflow results and extract information.""" + print("\n" + "="*60) + print("ANALYZING WORKFLOW RESULTS") + print("="*60) + + if not self.workflow: + return {"error": "No workflow to analyze"} + + interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) + workflow = interfaceDbChat.getWorkflow(self.workflow.id) + + if not workflow: + return {"error": "Workflow not found"} + + # Get unified chat data + chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) + + # Count messages + messages = chatData.get("messages", []) + userMessages = [m for m in messages if m.get("role") == "user"] + assistantMessages = [m for m in messages if m.get("role") == "assistant"] + + # Count documents + documents = chatData.get("documents", []) + + # Get logs + logs = chatData.get("logs", []) + + # Get stats + stats = chatData.get("stats", []) + + results = { + "workflowId": workflow.id, + "status": workflow.status, + "workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None, + "currentRound": workflow.currentRound, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions, + "messageCount": len(messages), + "userMessageCount": len(userMessages), + "assistantMessageCount": len(assistantMessages), + "documentCount": len(documents), + "logCount": len(logs), + "statCount": len(stats), + "messages": messages, + "documents": documents, + "logs": logs, + "stats": stats + } + + print(f"Workflow ID: {results['workflowId']}") + print(f"Status: {results['status']}") + print(f"Mode: {results['workflowMode']}") + print(f"Round: {results['currentRound']}") + print(f"Tasks: {results['totalTasks']}") + print(f"Actions: {results['totalActions']}") + print(f"Messages: {results['messageCount']} (User: {results['userMessageCount']}, Assistant: {results['assistantMessageCount']})") + print(f"Documents: {results['documentCount']}") + print(f"Logs: {results['logCount']}") + print(f"Stats: {results['statCount']}") + + # Print first user message + if userMessages: + print(f"\nFirst user message:") + print(f" {userMessages[0].get('message', '')[:200]}...") + + # Print last assistant message + if assistantMessages: + print(f"\nLast assistant message:") + lastMsg = assistantMessages[-1] + print(f" {lastMsg.get('message', '')[:200]}...") + if lastMsg.get('documents'): + print(f" Documents attached: {len(lastMsg['documents'])}") + + # Print document names + if documents: + print(f"\nGenerated documents:") + for doc in documents: + print(f" - {doc.get('fileName', 'unknown')} ({doc.get('fileSize', 0)} bytes)") + + return results + + async def runTest(self): + """Run the complete test.""" + print("\n" + "="*80) + print("WORKFLOW TEST WITH DOCUMENTS") + print("="*80) + + try: + # Initialize + await self.initialize() + + # Upload files + fileIds = await self.uploadFiles() + + # Start workflow with prompt and files + prompt = "Generiere die ersten 5000 Primzahlen in einer Tabelle mit 10 Spalten pro Zeile." + await self.startWorkflow(prompt, fileIds) + + # Wait for completion + completed = await self.waitForWorkflowCompletion(maxWaitTime=300) + + # Analyze results + results = self.analyzeWorkflowResults() + + self.testResults = { + "completed": completed, + "results": results + } + + print("\n" + "="*80) + print("TEST SUMMARY") + print("="*80) + print(f"Workflow completed: {'βœ…' if completed else '❌'}") + print(f"Status: {results.get('status', 'unknown')}") + print(f"Messages: {results.get('messageCount', 0)}") + print(f"Documents: {results.get('documentCount', 0)}") + + return self.testResults + + except Exception as e: + import traceback + print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}") + print(f"Traceback:\n{traceback.format_exc()}") + self.testResults = { + "completed": False, + "error": str(e), + "traceback": traceback.format_exc() + } + return self.testResults + + +async def main(): + """Run workflow test with documents.""" + tester = WorkflowWithDocumentsTester() + results = await tester.runTest() + + # Print final results as JSON for easy parsing + print("\n" + "="*80) + print("FINAL RESULTS (JSON)") + print("="*80) + print(json.dumps(results, indent=2, default=str)) + + +if __name__ == "__main__": + asyncio.run(main()) +