gateway/static/62_workflowData.js
2025-04-30 00:39:37 +02:00

541 lines
No EOL
19 KiB
JavaScript

/**
* Handles all API communication and data processing for the workflow module
*/
import api from '../shared/apiCalls.js';
import * as WorkflowCoordination from './workflowCoordination.js';
// Reference to the global state
let globalState = null;
/**
* Initializes the data management layer
* @param {Object} globalStateObj - Global application state
*/
function initDataLayer(globalStateObj) {
console.log("Initializing workflow data layer...");
globalState = globalStateObj;
console.log("Workflow data layer successfully initialized");
}
/**
* Uploads a file and adds it to appropriate list
* @param {File} file - The file to upload
* @returns {Promise<Object>} - Processed file object
*/
async function uploadAndAddFile(file) {
try {
let processedFile = null;
// Check if file already exists
let existingFile = null;
if (globalState && globalState.mainView && globalState.mainView.availableFiles) {
// First check by name and size (exact match)
existingFile = globalState.mainView.availableFiles.find(
f => f.name === file.name && f.size === file.size
);
// If not found, try with just the name
if (!existingFile) {
existingFile = globalState.mainView.availableFiles.find(
f => f.name === file.name
);
}
}
if (existingFile) {
// Use the existing file
console.log(`Using existing file: ${existingFile.name} (${existingFile.id})`);
processedFile = existingFile;
} else {
// Upload new file
console.log(`Uploading new file: ${file.name}`);
try {
processedFile = await api.uploadFile(file);
console.log(`File uploaded successfully: ${processedFile.name} (${processedFile.id})`);
// Add to global available files
if (globalState && globalState.mainView) {
if (!globalState.mainView.availableFiles) {
globalState.mainView.availableFiles = [];
}
globalState.mainView.availableFiles.push(processedFile);
}
// Update data statistics using tokensUsed from the response
if (processedFile.tokensUsed) {
WorkflowCoordination.updateDataStats(file.size, processedFile.tokensUsed);
} else {
WorkflowCoordination.updateDataStats(file.size, 0);
}
} catch (uploadError) {
console.error("Error uploading file:", uploadError);
throw uploadError;
}
}
return processedFile;
} catch (error) {
console.error("Error processing file:", error);
throw error;
}
}
/**
* Creates a new workflow with the given input
* @param {string} promptText - Text for the workflow prompt
* @param {Array} selectedFiles - Array of selected files
* @returns {Promise<Object>} - Created workflow
*/
async function createWorkflow(promptText, selectedFiles) {
try {
// Prepare file IDs
const fileIds = selectedFiles.map(file => file.id);
// Log the operation
console.log(`Creating new workflow with prompt: "${promptText.substring(0, 50)}${promptText.length > 50 ? '...' : ''}"`,
`and ${fileIds.length} files`);
// Make API call to create workflow
const response = await api.submitUserInput("", promptText, fileIds);
// Update data statistics with tokensUsed from response if available
if (response && response.dataStats) {
WorkflowCoordination.updateDataStats(
response.dataStats.bytesSent || 0,
response.dataStats.bytesReceived || 0,
response.dataStats.tokensUsed || 0
);
}
console.log("Workflow creation response:");
console.log(response);
return response;
} catch (error) {
console.error("Error creating workflow:", error);
throw error;
}
}
/**
* Submits user input to a running workflow
* @param {string} workflowId - ID of the workflow
* @param {string} prompt - User message
* @param {Array} listFileId - IDs of additional files
* @returns {Promise<Object>} - API response
*/
async function submitUserInput(workflowId, prompt, listFileId = []) {
try {
console.log(`Submitting user input to workflow ${workflowId}`);
console.log(`Prompt: "${prompt.substring(0, 50)}${prompt.length > 50 ? '...' : ''}"`,
`with ${listFileId.length} files`);
// Make API call
const response = await api.submitUserInput(workflowId, prompt, listFileId);
// Update data statistics with response stats
if (response && response.dataStats) {
WorkflowCoordination.updateDataStats(
response.dataStats.bytesSent || 0,
response.dataStats.bytesReceived || 0,
response.dataStats.tokensUsed || 0
);
}
console.log("User input submission response:", response);
return response;
} catch (error) {
console.error("Error submitting user input:", error);
throw error;
}
}
/**
* Stops a running workflow
* @param {string} workflowId - ID of the workflow
* @returns {Promise<Object>} - API response
*/
async function stopWorkflow(workflowId) {
try {
console.log(`Stopping workflow ${workflowId}`);
// Immediately set pollActive to false to prevent further polling
const workflowState = WorkflowCoordination.getWorkflowState();
workflowState.pollActive = false;
// Make API call
const response = await api.stopWorkflow(workflowId);
console.log("Workflow stop response:", response);
return response;
} catch (error) {
console.error("Error stopping workflow:", error);
// Ensure pollActive is set to false even on error
const workflowState = WorkflowCoordination.getWorkflowState();
workflowState.pollActive = false;
throw error;
}
}
/**
* Polls workflow status and updates state accordingly
* @param {string} workflowId - ID of the workflow
*/
async function pollWorkflowStatus(workflowId) {
try {
if (!workflowId) {
console.warn("Cannot poll workflow status: No workflow ID provided");
return;
}
const workflowState = WorkflowCoordination.getWorkflowState();
// Only poll if we have an active polling flag
if (!workflowState.pollActive) {
console.log(`Polling stopped: pollActive=${workflowState.pollActive}, status=${workflowState.status}`);
return;
}
console.log(`Polling status for workflow ${workflowId}, pollActive=${workflowState.pollActive}, status=${workflowState.status}`);
// Get workflow status from API
const statusResponse = await api.getWorkflowStatus(workflowId);
// Check if polling should continue after API call
// This prevents race conditions where polling state changes during API call
if (!workflowState.pollActive) {
console.log(`Polling aborted after API call: pollActive=${workflowState.pollActive}`);
return;
}
if (!statusResponse) {
console.warn("No status response received");
return;
}
console.log(`Received status for workflow ${workflowId}:`, statusResponse);
// Update stats if available
if (statusResponse.dataStats) {
WorkflowCoordination.updateDataStats(
statusResponse.dataStats.bytesSent || 0,
statusResponse.dataStats.bytesReceived || 0,
statusResponse.dataStats.tokensUsed || 0
);
}
// Get status value from response (may be in different locations in the response)
const status = statusResponse.status ||
(statusResponse.workflow ? statusResponse.workflow.status : null);
if (!status) {
console.warn("Status value not found in response");
return;
}
if (status !== workflowState.status) {
console.log(`Workflow status changed: ${workflowState.status}${status}`);
// Update state based on new status
WorkflowCoordination.updateWorkflowStatus(status, {
message: `Workflow status updated to: ${status}`,
systemMessage: getSystemMessageForStatus(status)
});
// If status is 'failed' or 'stopped', stop polling immediately
// For 'completed', polling continues until 'last' message is received
if (status === 'failed' || status === 'stopped') {
workflowState.pollActive = false;
console.log(`Workflow ${workflowId} reached terminal state: ${status}, polling stopped`);
return;
}
}
// Poll for logs and messages
await Promise.all([
pollWorkflowLogs(workflowId),
pollWorkflowMessages(workflowId)
]);
// Check polling active flag again after API calls to avoid race conditions
if (workflowState.pollActive) {
setTimeout(() => pollWorkflowStatus(workflowId), 2000);
} else {
console.log(`Polling stopped after API calls: pollActive=${workflowState.pollActive}`);
}
} catch (error) {
console.error("Error polling workflow status:", error);
// Get current state for error handling
const workflowState = WorkflowCoordination.getWorkflowState();
// Implement retry with exponential backoff only if we're still supposed to be polling
if (workflowState.pollActive) {
const backoffTime = workflowState.pollFailCount ?
Math.min(2000 * Math.pow(1.5, workflowState.pollFailCount), 16000) :
2000;
console.log(`Retrying poll in ${backoffTime}ms (attempt ${(workflowState.pollFailCount || 0) + 1})`);
setTimeout(() => pollWorkflowStatus(workflowId), backoffTime);
// Update fail count
workflowState.pollFailCount = (workflowState.pollFailCount || 0) + 1;
// After multiple failures, show error and stop polling
if (workflowState.pollFailCount > 5) {
WorkflowCoordination.addLogEntry(
"Connection issues detected. Please check your network connection.",
"error"
);
// Set failed state and stop polling after multiple failures
WorkflowCoordination.updateWorkflowStatus('failed', {
message: "Workflow failed due to connection issues",
systemMessage: "Failed to connect to the server"
});
workflowState.pollActive = false;
}
}
}
}
/**
* Gets appropriate system message for status change
* @param {string} status - New workflow status
* @returns {string} - System message
*/
function getSystemMessageForStatus(status) {
switch (status) {
case 'completed':
return "Workflow completed successfully";
case 'failed':
return "Workflow failed to complete";
case 'stopped':
return "Workflow was stopped";
default:
return null;
}
}
/**
* Polls workflow logs and updates UI
* @param {string} workflowId - ID of the workflow
*/
async function pollWorkflowLogs(workflowId) {
try {
// Get current workflow state
const workflowState = WorkflowCoordination.getWorkflowState();
// Get logs from API
console.info("polling api request param id:", workflowState.lastPolledLogId);
const logs = await api.getWorkflowLogs(workflowId, workflowState.lastPolledLogId);
console.log("Received workflow logs:", logs); // DEBUG
// Update data statistics if available in response
if (logs && logs.dataStats) {
WorkflowCoordination.updateDataStats(0, logs.dataStats.bytesReceived || 0);
}
// Properly handle different response formats
let logsArray = [];
if (Array.isArray(logs)) {
logsArray = logs;
} else{
console.log("ERROR: Log meessages in wrong format")
return;
}
if (!logsArray || logsArray.length === 0) {
return;
}
console.log(`Processing ${logsArray.length} new logs`); // Debug logging
// Process new logs
const existingLogIds = new Set(workflowState.logs.map(log => log.id));
logsArray.forEach(log => {
// Only process new logs
if (!existingLogIds.has(log.id)) {
// Ensure log has all required properties
const processedLog = {
id: log.id,
message: log.message || 'No message',
type: log.type || 'info',
timestamp: log.timestamp || new Date().toISOString(),
agentName: log.agentName || null,
details: log.details || null,
progress: log.progress !== undefined ? log.progress : undefined,
status: log.status || null
};
// Add log entry
WorkflowCoordination.addLogEntry(
processedLog.message,
processedLog.type,
processedLog.details,
processedLog.agentName,
processedLog.progress
);
// Store ID of last processed log
workflowState.lastPolledLogId = log.id;
console.info("polling api next param id:", workflowState.lastPolledLogId);
}
});
} catch (error) {
console.error("Error polling workflow logs:", error);
// Fix: Set pollActive to false using correct boolean value
const workflowState = WorkflowCoordination.getWorkflowState();
// Only mark as failed after repeated errors in running state
if (workflowState.pollFailCount > 3 && workflowState.status === 'running') {
WorkflowCoordination.updateWorkflowStatus('failed', {
message: "Failed to retrieve workflow logs",
systemMessage: "Error communicating with server"
});
workflowState.pollActive = false;
}
}
}
/**
* Polls workflow messages and updates UI
* @param {string} workflowId - ID of the workflow
*/
async function pollWorkflowMessages(workflowId) {
try {
// Get current workflow state
const workflowState = WorkflowCoordination.getWorkflowState();
// Get messages from API
const messagesResponse = await api.getWorkflowMessages(workflowId, workflowState.lastPolledMessageId);
// Extract messages array and handle different response formats
const messages = Array.isArray(messagesResponse) ? messagesResponse :
(messagesResponse && messagesResponse.messages ? messagesResponse.messages : []);
if (!messages || messages.length === 0) {
return;
}
// Process new messages
const existingMessageIds = new Set(workflowState.chatMessages.map(msg => msg.id));
messages.forEach(message => {
// Only process new messages
if (!existingMessageIds.has(message.id)) {
const newMessage = {
id: message.id,
agentName: message.agentName || '',
content: message.content || '',
role: message.role || '',
timestamp: message.startedAt || message.timestamp || new Date().toISOString(),
documents: message.documents || [],
status: message.status || null
};
WorkflowCoordination.addChatMessage(newMessage);
workflowState.lastPolledMessageId = message.id;
// Check for last message
if (message.status === 'last') {
console.log("Last message found, stopping polling");
workflowState.pollActive = false;
}
}
});
} catch (error) {
console.error("Error polling workflow messages:", error);
}
}
/**
* Deletes a message from a workflow
* @param {string} workflowId - ID of the workflow
* @param {string} messageId - ID of the message to delete
* @returns {Promise<boolean>} - True if successful, false otherwise
*/
async function deleteWorkflowMessage(workflowId, messageId) {
if (!workflowId || !messageId) {
console.error("Invalid parameters for deleteWorkflowMessage");
return false;
}
try {
// Make API call
const success = await api.deleteWorkflowMessage(workflowId, messageId);
console.log("Delete message response:", success);
// Return success status
return Boolean(success);
} catch (error) {
console.error("Error deleting message:", error);
// Be forgiving with 404 errors - the message might already be gone
if (error.message && error.message.includes("404")) {
console.log("Message not found (404), considering it deleted");
return true;
}
return false;
}
}
/**
* Deletes a file from a workflow message
* @param {string} workflowId - ID of the workflow
* @param {string} messageId - ID of the message
* @param {string} fileId - ID of the file to delete
* @returns {Promise<boolean>} - True if successful, false otherwise
*/
async function deleteFileFromMessage(workflowId, messageId, fileId) {
// Delete file inside message
try {
console.log(`Deleting file from message: workflow=${workflowId}, message=${messageId}, file=${fileId}`);
// Make API call
const success = await api.deleteFileFromMessage(workflowId, messageId, fileId);
console.log("File deletion response:", success);
// Return success status
return Boolean(success);
}
catch (error) {
console.error("Error deleting file from message:", error);
// If file not found, still return success to update UI
if (error.message && (error.message.includes("404") || error.message.includes("not found"))) {
console.log("File not found, removing from UI");
return true;
}
return false;
}
}
// Export functions
export {
initDataLayer,
createWorkflow,
submitUserInput,
stopWorkflow,
pollWorkflowStatus,
pollWorkflowLogs,
pollWorkflowMessages,
uploadAndAddFile,
deleteWorkflowMessage,
deleteFileFromMessage
};