/** * Handles all API communication and data processing for the workflow module */ import api from '../shared/apiCalls.js'; import * as WorkflowCoordination from './workflowCoordination.js'; // Reference to the global state let globalState = null; /** * Initializes the data management layer * @param {Object} globalStateObj - Global application state */ function initDataLayer(globalStateObj) { console.log("Initializing workflow data layer..."); globalState = globalStateObj; console.log("Workflow data layer successfully initialized"); } /** * Uploads a file and adds it to appropriate list * @param {File} file - The file to upload * @returns {Promise} - Processed file object */ async function uploadAndAddFile(file) { try { let processedFile = null; // Check if file already exists let existingFile = null; if (globalState && globalState.mainView && globalState.mainView.availableFiles) { // First check by name and size (exact match) existingFile = globalState.mainView.availableFiles.find( f => f.name === file.name && f.size === file.size ); // If not found, try with just the name if (!existingFile) { existingFile = globalState.mainView.availableFiles.find( f => f.name === file.name ); } } if (existingFile) { // Use the existing file console.log(`Using existing file: ${existingFile.name} (${existingFile.id})`); processedFile = existingFile; } else { // Upload new file console.log(`Uploading new file: ${file.name}`); try { processedFile = await api.uploadFile(file); console.log(`File uploaded successfully: ${processedFile.name} (${processedFile.id})`); // Add to global available files if (globalState && globalState.mainView) { if (!globalState.mainView.availableFiles) { globalState.mainView.availableFiles = []; } globalState.mainView.availableFiles.push(processedFile); } // Update data statistics using tokensUsed from the response if (processedFile.tokensUsed) { WorkflowCoordination.updateDataStats(file.size, processedFile.tokensUsed); } else { WorkflowCoordination.updateDataStats(file.size, 0); } } catch (uploadError) { console.error("Error uploading file:", uploadError); throw uploadError; } } return processedFile; } catch (error) { console.error("Error processing file:", error); throw error; } } /** * Creates a new workflow with the given input * @param {string} promptText - Text for the workflow prompt * @param {Array} selectedFiles - Array of selected files * @returns {Promise} - Created workflow */ async function createWorkflow(promptText, selectedFiles) { try { // Prepare file IDs const fileIds = selectedFiles.map(file => file.id); // Log the operation console.log(`Creating new workflow with prompt: "${promptText.substring(0, 50)}${promptText.length > 50 ? '...' : ''}"`, `and ${fileIds.length} files`); // Make API call to create workflow const response = await api.submitUserInput("", promptText, fileIds); // Update data statistics with tokensUsed from response if available if (response && response.dataStats) { WorkflowCoordination.updateDataStats( response.dataStats.bytesSent || 0, response.dataStats.bytesReceived || 0, response.dataStats.tokensUsed || 0 ); } console.log("Workflow creation response:"); console.log(response); return response; } catch (error) { console.error("Error creating workflow:", error); throw error; } } /** * Submits user input to a running workflow * @param {string} workflowId - ID of the workflow * @param {string} prompt - User message * @param {Array} listFileId - IDs of additional files * @returns {Promise} - API response */ async function submitUserInput(workflowId, prompt, listFileId = []) { try { console.log(`Submitting user input to workflow ${workflowId}`); console.log(`Prompt: "${prompt.substring(0, 50)}${prompt.length > 50 ? '...' : ''}"`, `with ${listFileId.length} files`); // Make API call const response = await api.submitUserInput(workflowId, prompt, listFileId); // Update data statistics with response stats if (response && response.dataStats) { WorkflowCoordination.updateDataStats( response.dataStats.bytesSent || 0, response.dataStats.bytesReceived || 0, response.dataStats.tokensUsed || 0 ); } console.log("User input submission response:", response); return response; } catch (error) { console.error("Error submitting user input:", error); throw error; } } /** * Stops a running workflow * @param {string} workflowId - ID of the workflow * @returns {Promise} - API response */ async function stopWorkflow(workflowId) { try { console.log(`Stopping workflow ${workflowId}`); // Immediately set pollActive to false to prevent further polling const workflowState = WorkflowCoordination.getWorkflowState(); workflowState.pollActive = false; // Make API call const response = await api.stopWorkflow(workflowId); console.log("Workflow stop response:", response); return response; } catch (error) { console.error("Error stopping workflow:", error); // Ensure pollActive is set to false even on error const workflowState = WorkflowCoordination.getWorkflowState(); workflowState.pollActive = false; throw error; } } /** * Polls workflow status and updates state accordingly * @param {string} workflowId - ID of the workflow */ async function pollWorkflowStatus(workflowId) { try { if (!workflowId) { console.warn("Cannot poll workflow status: No workflow ID provided"); return; } const workflowState = WorkflowCoordination.getWorkflowState(); // Only poll if we have an active polling flag if (!workflowState.pollActive) { console.log(`Polling stopped: pollActive=${workflowState.pollActive}, status=${workflowState.status}`); return; } console.log(`Polling status for workflow ${workflowId}, pollActive=${workflowState.pollActive}, status=${workflowState.status}`); // Get workflow status from API const statusResponse = await api.getWorkflowStatus(workflowId); // Check if polling should continue after API call // This prevents race conditions where polling state changes during API call if (!workflowState.pollActive) { console.log(`Polling aborted after API call: pollActive=${workflowState.pollActive}`); return; } if (!statusResponse) { console.warn("No status response received"); return; } console.log(`Received status for workflow ${workflowId}:`, statusResponse); // Update stats if available if (statusResponse.dataStats) { WorkflowCoordination.updateDataStats( statusResponse.dataStats.bytesSent || 0, statusResponse.dataStats.bytesReceived || 0, statusResponse.dataStats.tokensUsed || 0 ); } // Get status value from response (may be in different locations in the response) const status = statusResponse.status || (statusResponse.workflow ? statusResponse.workflow.status : null); if (!status) { console.warn("Status value not found in response"); return; } if (status !== workflowState.status) { console.log(`Workflow status changed: ${workflowState.status} → ${status}`); // Update state based on new status WorkflowCoordination.updateWorkflowStatus(status, { message: `Workflow status updated to: ${status}`, systemMessage: getSystemMessageForStatus(status) }); // If status is 'failed' or 'stopped', stop polling immediately // For 'completed', polling continues until 'last' message is received if (status === 'failed' || status === 'stopped') { workflowState.pollActive = false; console.log(`Workflow ${workflowId} reached terminal state: ${status}, polling stopped`); return; } } // Poll for logs and messages await Promise.all([ pollWorkflowLogs(workflowId), pollWorkflowMessages(workflowId) ]); // Check polling active flag again after API calls to avoid race conditions if (workflowState.pollActive) { setTimeout(() => pollWorkflowStatus(workflowId), 2000); } else { console.log(`Polling stopped after API calls: pollActive=${workflowState.pollActive}`); } } catch (error) { console.error("Error polling workflow status:", error); // Get current state for error handling const workflowState = WorkflowCoordination.getWorkflowState(); // Implement retry with exponential backoff only if we're still supposed to be polling if (workflowState.pollActive) { const backoffTime = workflowState.pollFailCount ? Math.min(2000 * Math.pow(1.5, workflowState.pollFailCount), 16000) : 2000; console.log(`Retrying poll in ${backoffTime}ms (attempt ${(workflowState.pollFailCount || 0) + 1})`); setTimeout(() => pollWorkflowStatus(workflowId), backoffTime); // Update fail count workflowState.pollFailCount = (workflowState.pollFailCount || 0) + 1; // After multiple failures, show error and stop polling if (workflowState.pollFailCount > 5) { WorkflowCoordination.addLogEntry( "Connection issues detected. Please check your network connection.", "error" ); // Set failed state and stop polling after multiple failures WorkflowCoordination.updateWorkflowStatus('failed', { message: "Workflow failed due to connection issues", systemMessage: "Failed to connect to the server" }); workflowState.pollActive = false; } } } } /** * Gets appropriate system message for status change * @param {string} status - New workflow status * @returns {string} - System message */ function getSystemMessageForStatus(status) { switch (status) { case 'completed': return "Workflow completed successfully"; case 'failed': return "Workflow failed to complete"; case 'stopped': return "Workflow was stopped"; default: return null; } } /** * Polls workflow logs and updates UI * @param {string} workflowId - ID of the workflow */ async function pollWorkflowLogs(workflowId) { try { // Get current workflow state const workflowState = WorkflowCoordination.getWorkflowState(); // Get logs from API console.info("polling api request param id:", workflowState.lastPolledLogId); const logs = await api.getWorkflowLogs(workflowId, workflowState.lastPolledLogId); console.log("Received workflow logs:", logs); // DEBUG // Update data statistics if available in response if (logs && logs.dataStats) { WorkflowCoordination.updateDataStats(0, logs.dataStats.bytesReceived || 0); } // Properly handle different response formats let logsArray = []; if (Array.isArray(logs)) { logsArray = logs; } else{ console.log("ERROR: Log meessages in wrong format") return; } if (!logsArray || logsArray.length === 0) { return; } console.log(`Processing ${logsArray.length} new logs`); // Debug logging // Process new logs const existingLogIds = new Set(workflowState.logs.map(log => log.id)); logsArray.forEach(log => { // Only process new logs if (!existingLogIds.has(log.id)) { // Ensure log has all required properties const processedLog = { id: log.id, message: log.message || 'No message', type: log.type || 'info', timestamp: log.timestamp || new Date().toISOString(), agentName: log.agentName || null, details: log.details || null, progress: log.progress !== undefined ? log.progress : undefined, status: log.status || null }; // Add log entry WorkflowCoordination.addLogEntry( processedLog.message, processedLog.type, processedLog.details, processedLog.agentName, processedLog.progress ); // Store ID of last processed log workflowState.lastPolledLogId = log.id; console.info("polling api next param id:", workflowState.lastPolledLogId); } }); } catch (error) { console.error("Error polling workflow logs:", error); // Fix: Set pollActive to false using correct boolean value const workflowState = WorkflowCoordination.getWorkflowState(); // Only mark as failed after repeated errors in running state if (workflowState.pollFailCount > 3 && workflowState.status === 'running') { WorkflowCoordination.updateWorkflowStatus('failed', { message: "Failed to retrieve workflow logs", systemMessage: "Error communicating with server" }); workflowState.pollActive = false; } } } /** * Polls workflow messages and updates UI * @param {string} workflowId - ID of the workflow */ async function pollWorkflowMessages(workflowId) { try { // Get current workflow state const workflowState = WorkflowCoordination.getWorkflowState(); // Get messages from API const messagesResponse = await api.getWorkflowMessages(workflowId, workflowState.lastPolledMessageId); // Extract messages array and handle different response formats const messages = Array.isArray(messagesResponse) ? messagesResponse : (messagesResponse && messagesResponse.messages ? messagesResponse.messages : []); if (!messages || messages.length === 0) { return; } // Process new messages const existingMessageIds = new Set(workflowState.chatMessages.map(msg => msg.id)); messages.forEach(message => { // Only process new messages if (!existingMessageIds.has(message.id)) { const newMessage = { id: message.id, agentName: message.agentName || '', content: message.content || '', role: message.role || '', timestamp: message.startedAt || message.timestamp || new Date().toISOString(), documents: message.documents || [], status: message.status || null }; WorkflowCoordination.addChatMessage(newMessage); workflowState.lastPolledMessageId = message.id; // Check for last message if (message.status === 'last') { console.log("Last message found, stopping polling"); workflowState.pollActive = false; } } }); } catch (error) { console.error("Error polling workflow messages:", error); } } /** * Deletes a message from a workflow * @param {string} workflowId - ID of the workflow * @param {string} messageId - ID of the message to delete * @returns {Promise} - True if successful, false otherwise */ async function deleteWorkflowMessage(workflowId, messageId) { if (!workflowId || !messageId) { console.error("Invalid parameters for deleteWorkflowMessage"); return false; } try { // Make API call const success = await api.deleteWorkflowMessage(workflowId, messageId); console.log("Delete message response:", success); // Return success status return Boolean(success); } catch (error) { console.error("Error deleting message:", error); // Be forgiving with 404 errors - the message might already be gone if (error.message && error.message.includes("404")) { console.log("Message not found (404), considering it deleted"); return true; } return false; } } /** * Deletes a file from a workflow message * @param {string} workflowId - ID of the workflow * @param {string} messageId - ID of the message * @param {string} fileId - ID of the file to delete * @returns {Promise} - True if successful, false otherwise */ async function deleteFileFromMessage(workflowId, messageId, fileId) { // Delete file inside message try { console.log(`Deleting file from message: workflow=${workflowId}, message=${messageId}, file=${fileId}`); // Make API call const success = await api.deleteFileFromMessage(workflowId, messageId, fileId); console.log("File deletion response:", success); // Return success status return Boolean(success); } catch (error) { console.error("Error deleting file from message:", error); // If file not found, still return success to update UI if (error.message && (error.message.includes("404") || error.message.includes("not found"))) { console.log("File not found, removing from UI"); return true; } return false; } } // Export functions export { initDataLayer, createWorkflow, submitUserInput, stopWorkflow, pollWorkflowStatus, pollWorkflowLogs, pollWorkflowMessages, uploadAndAddFile, deleteWorkflowMessage, deleteFileFromMessage };