ui-nyla/src/hooks/useChatbot.ts

781 lines
27 KiB
TypeScript

import { useState, useCallback, useRef, useMemo, useEffect } from 'react';
import { useApiRequest } from './useApi';
import api from '../api';
import {
startChatbotStreamApi,
stopChatbotApi,
getChatbotThreadsApi,
getChatbotThreadApi,
deleteChatbotWorkflowApi,
type ChatDataItem,
type StartChatbotRequest,
type ChatbotWorkflow
} from '../api/chatbotApi';
import { Message } from '../components/UiComponents/Messages/MessagesTypes';
// Simple sort function for messages
const sortMessages = (a: Message, b: Message) => {
if (a.publishedAt !== undefined && b.publishedAt !== undefined) {
return a.publishedAt - b.publishedAt;
}
if (a.publishedAt !== undefined) return -1;
if (b.publishedAt !== undefined) return 1;
if (a.sequenceNr !== undefined && b.sequenceNr !== undefined) {
return a.sequenceNr - b.sequenceNr;
}
return 0;
};
export function useChatbot() {
const [inputValue, setInputValue] = useState<string>('');
const [messages, setMessages] = useState<Message[]>([]);
const [workflowId, setWorkflowId] = useState<string | null>(null);
const [isRunning, setIsRunning] = useState<boolean>(false);
const [isSubmitting, setIsSubmitting] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
// File upload state
const [pendingFileIds, setPendingFileIds] = useState<string[]>([]);
const pendingFileIdsRef = useRef<string[]>([]); // Ref to avoid closure issues
const [uploadingFile, setUploadingFile] = useState<boolean>(false);
const [uploadError, setUploadError] = useState<string | null>(null);
const [uploadedFiles, setUploadedFiles] = useState<Array<{ fileId: string; fileName: string }>>([]);
// Chat history state
const [threads, setThreads] = useState<ChatbotWorkflow[]>([]);
const [selectedThreadId, setSelectedThreadId] = useState<string | null>(null);
const [threadsLoading, setThreadsLoading] = useState<boolean>(false);
const [threadsError, setThreadsError] = useState<string | null>(null);
const [deletingThreads, setDeletingThreads] = useState<Set<string>>(new Set());
const { request } = useApiRequest();
const streamAbortControllerRef = useRef<AbortController | null>(null);
const processedMessageIdsRef = useRef<Set<string>>(new Set());
const thinkingMessageIdRef = useRef<string | null>(null);
const thinkingLogsRef = useRef<string[]>([]); // Use ref instead of state to avoid batching
const logQueueRef = useRef<string[]>([]); // Queue for logs to process one by one
const isProcessingLogsRef = useRef<boolean>(false); // Flag to prevent concurrent processing
const processedLogsRef = useRef<Set<string>>(new Set()); // Track processed logs to prevent duplicates
// Clear processed message IDs when workflow changes
const clearProcessedMessages = useCallback(() => {
processedMessageIdsRef.current.clear();
}, []);
// Clear thinking message when a new assistant message arrives
const clearThinkingMessage = useCallback(() => {
// Clear log queue and stop processing
logQueueRef.current = [];
isProcessingLogsRef.current = false;
processedLogsRef.current.clear(); // Clear processed logs tracking
// Reset thinking message refs
const thinkingId = thinkingMessageIdRef.current;
thinkingMessageIdRef.current = null;
thinkingLogsRef.current = [];
// Remove ALL thinking messages (not just the one with current ID)
// This handles cases where multiple thinking messages might exist
setMessages(prevMessages => {
return prevMessages.filter(m => m.status !== 'thinking');
});
}, []);
// Process logs from queue one by one (progressive display)
const processLogQueue = useCallback(() => {
if (isProcessingLogsRef.current || logQueueRef.current.length === 0) {
return;
}
isProcessingLogsRef.current = true;
const logMessage = logQueueRef.current.shift()!;
// Add log to accumulated logs
thinkingLogsRef.current = [...thinkingLogsRef.current, logMessage];
// Get or create thinking message ID
const thinkingId = thinkingMessageIdRef.current || `thinking-${Date.now()}`;
thinkingMessageIdRef.current = thinkingId;
// Create/update thinking message with all accumulated logs
// Format logs as Markdown list items so each log appears on a separate line
const formattedLogs = thinkingLogsRef.current
.map(log => `- ${log.trim()}`)
.join('\n');
const thinkingMessage: Message = {
id: thinkingId,
workflowId: workflowId || '',
role: 'assistant',
message: formattedLogs,
publishedAt: Date.now() - 1,
status: 'thinking'
};
// Update messages immediately
setMessages(prevMessages => {
// Remove ALL thinking messages first (to prevent duplicates from previous workflows)
const filtered = prevMessages.filter(m => m.status !== 'thinking');
// Add updated thinking message
const updated = [...filtered, thinkingMessage];
return updated.sort(sortMessages);
});
// Process next log after a small delay (progressive display)
if (logQueueRef.current.length > 0) {
setTimeout(() => {
isProcessingLogsRef.current = false;
processLogQueue();
}, 50); // Small delay between logs for progressive display
} else {
isProcessingLogsRef.current = false;
}
}, [workflowId]);
// Add a single log to thinking message (progressive display)
const addLogToThinkingMessage = useCallback((logMessage: string, createdAt?: number) => {
// Create a unique key for this log message to detect duplicates
// Use content + createdAt timestamp if available, otherwise use current time
const timestamp = createdAt || Date.now();
const logKey = `${logMessage.trim()}_${timestamp}`;
// Skip if this log was already processed
if (processedLogsRef.current.has(logKey)) {
console.log('[useChatbot] Skipping duplicate log:', logMessage.substring(0, 50) + '...');
return;
}
// Mark as processed
processedLogsRef.current.add(logKey);
// Add log to queue
logQueueRef.current.push(logMessage);
// Start processing if not already processing
if (!isProcessingLogsRef.current) {
processLogQueue();
}
}, [processLogQueue]);
// Process SSE event and update messages
const processChatDataItem = useCallback((item: ChatDataItem) => {
// Log the actual streamed response for debugging
console.log('[useChatbot] Streamed item:', {
type: item.type,
createdAt: item.createdAt,
item: item.item
});
if (item.type === 'log' && item.item) {
// Process log item - add to thinking message one at a time
const logData = item.item as any;
const logMessage = logData.message || logData.text || '';
console.log('[useChatbot] Processing log:', {
message: logMessage.substring(0, 100) + (logMessage.length > 100 ? '...' : ''),
createdAt: item.createdAt,
fullItem: item
});
if (logMessage) {
// Add log immediately (progressive display) with createdAt for deduplication
addLogToThinkingMessage(logMessage, item.createdAt);
}
} else if (item.type === 'message' && item.item) {
const messageData = item.item as any;
// Ensure message has required fields
if (!messageData.id) {
console.warn('⚠️ Invalid message item (missing id):', messageData);
return;
}
// Check if this is an assistant message that should clear thinking message
const messageRole = messageData.role?.toLowerCase();
const isAssistantMessage = messageRole === 'assistant' || messageRole === 'ai' || messageRole === 'system';
// ALWAYS clear thinking message when ANY message arrives (not just assistant)
// The thinking message should disappear when the real message comes
const thinkingId = thinkingMessageIdRef.current;
// Check if we've already processed this message
const messageId = messageData.id;
// Always clear thinking messages when a real message arrives
clearThinkingMessage();
if (processedMessageIdsRef.current.has(messageId)) {
// Update existing message
setMessages(prevMessages => {
const existingIndex = prevMessages.findIndex(m => m.id === messageId);
if (existingIndex >= 0) {
const updated = [...prevMessages];
updated[existingIndex] = messageData as Message;
return updated.sort(sortMessages);
}
return prevMessages;
});
} else {
// Add new message
processedMessageIdsRef.current.add(messageId);
setMessages(prevMessages => {
// Add new message (thinking messages already cleared by clearThinkingMessage)
const updated = [...prevMessages, messageData as Message];
return updated.sort(sortMessages);
});
}
}
}, [addLogToThinkingMessage]);
// Load all threads (with loading state)
const loadThreads = useCallback(async () => {
try {
setThreadsLoading(true);
setThreadsError(null);
const result = await getChatbotThreadsApi(request);
// Sort threads by lastActivity (newest first)
const sortedThreads = [...result.items].sort((a, b) => {
const aTime = a.lastActivity || a.startedAt || 0;
const bTime = b.lastActivity || b.startedAt || 0;
return bTime - aTime; // Descending order (newest first)
});
setThreads(sortedThreads);
} catch (err: any) {
console.error('Error loading threads:', err);
setThreadsError(err.message || 'Failed to load threads');
} finally {
setThreadsLoading(false);
}
}, [request]);
// Load threads silently (without loading state) - keeps existing threads visible
const loadThreadsSilently = useCallback(async () => {
try {
setThreadsError(null);
const result = await getChatbotThreadsApi(request);
// Sort threads by lastActivity (newest first)
const sortedThreads = [...result.items].sort((a, b) => {
const aTime = a.lastActivity || a.startedAt || 0;
const bTime = b.lastActivity || b.startedAt || 0;
return bTime - aTime; // Descending order (newest first)
});
setThreads(sortedThreads);
} catch (err: any) {
console.error('Error loading threads silently:', err);
setThreadsError(err.message || 'Failed to load threads');
}
}, [request]);
// Load a specific workflow and its messages
const loadWorkflow = useCallback(async (workflowIdToLoad: string) => {
try {
setError(null);
const result = await getChatbotThreadApi(request, workflowIdToLoad);
console.log('[loadWorkflow] Full result:', JSON.stringify(result, null, 2));
console.log('[loadWorkflow] chatData structure:', {
isArray: Array.isArray(result.chatData),
isObject: result.chatData && typeof result.chatData === 'object',
chatDataKeys: result.chatData && typeof result.chatData === 'object' ? Object.keys(result.chatData) : [],
hasItems: result.chatData?.items !== undefined,
itemsLength: Array.isArray(result.chatData?.items) ? result.chatData.items.length : 'not an array'
});
// Convert chatData to Message[]
const loadedMessages: Message[] = [];
const processedIds = new Set<string>();
// Backend returns chatData as { items: ChatDataItem[] } structure
const chatDataArray: any[] = result.chatData?.items || [];
console.log('[loadWorkflow] Processing chatDataArray:', {
length: chatDataArray.length,
types: chatDataArray.map((item, idx) => ({
index: idx,
type: item?.type,
hasItem: !!item?.item,
keys: item ? Object.keys(item) : []
}))
});
for (const item of chatDataArray) {
// Handle ChatDataItem structure: { type: 'message', createdAt: number, item: Message }
if (item.type === 'message' && item.item) {
const messageData = item.item as any;
if (messageData.id && !processedIds.has(messageData.id)) {
processedIds.add(messageData.id);
loadedMessages.push(messageData as Message);
}
}
// Handle direct Message structure (if item is already a message)
else if (item.role && item.message !== undefined) {
// This looks like a Message object directly
if (item.id && !processedIds.has(item.id)) {
processedIds.add(item.id);
loadedMessages.push(item as Message);
}
}
}
console.log('[loadWorkflow] Loaded messages:', {
count: loadedMessages.length,
messageIds: loadedMessages.map(m => m.id),
sampleMessage: loadedMessages.length > 0 ? loadedMessages[0] : null
});
// Sort messages chronologically
const sortedMessages = loadedMessages.sort(sortMessages);
// Update state
setMessages(sortedMessages);
setWorkflowId(workflowIdToLoad);
processedMessageIdsRef.current = processedIds;
console.log('[loadWorkflow] State updated:', {
messagesCount: sortedMessages.length,
workflowId: workflowIdToLoad
});
// Don't refresh threads list here - it causes unwanted loading state
// Threads will be refreshed after new messages are sent/completed
} catch (err: any) {
console.error('Error loading workflow:', err);
setError(err.message || 'Failed to load workflow');
}
}, [request]);
// Select a thread (loads its messages)
const selectThread = useCallback(async (workflowIdToSelect: string) => {
setSelectedThreadId(workflowIdToSelect);
await loadWorkflow(workflowIdToSelect);
}, [loadWorkflow]);
// Auto-load threads on initialization
useEffect(() => {
loadThreads();
}, [loadThreads]);
// Handle input change
const onInputChange = useCallback((value: string) => {
setInputValue(value);
}, []);
// Handle file upload
const handleFileUpload = useCallback(async (file: File): Promise<{ success: boolean; data?: any }> => {
setUploadError(null);
setUploadingFile(true);
try {
// Validate file before upload
if (!file || !file.name || file.name.trim() === '') {
throw new Error('Invalid file: File must have a valid name');
}
if (file.size === 0) {
throw new Error('Invalid file: File cannot be empty');
}
const formData = new FormData();
formData.append('file', file);
const response = await api.post('/api/files/upload', formData, {
headers: {
'Content-Type': 'multipart/form-data',
}
});
const fileData = response.data;
// Extract fileId from response
// Backend returns: { message: "...", file: { id: "...", ... }, duplicateType: "..." }
const fileId = fileData?.file?.id || fileData?.id || fileData?.fileId;
if (!fileId) {
console.error('Upload response structure:', fileData);
throw new Error('Upload failed: No file ID returned from server');
}
// Extract file name from response (use storedFileName if available, otherwise original fileName)
const fileName = fileData?.file?.fileName || fileData?.storedFileName || file.name;
// Add to pending file IDs and uploaded files list
setPendingFileIds(prev => {
const updated = [...prev, fileId];
pendingFileIdsRef.current = updated; // Keep ref in sync
return updated;
});
setUploadedFiles(prev => [...prev, { fileId, fileName }]);
return { success: true, data: fileData };
} catch (err: any) {
console.error('File upload failed:', err);
const errorMessage = err.message || 'Failed to upload file';
setUploadError(errorMessage);
return { success: false, error: errorMessage };
} finally {
setUploadingFile(false);
}
}, []);
// Handle file remove (remove from pending list)
const handleFileRemove = useCallback((fileId: string) => {
setPendingFileIds(prev => {
const updated = prev.filter(id => id !== fileId);
pendingFileIdsRef.current = updated; // Keep ref in sync
return updated;
});
setUploadedFiles(prev => prev.filter(f => f.fileId !== fileId));
}, []);
// Stop chatbot workflow
const stopChatbot = useCallback(async () => {
if (!workflowId || !isRunning) {
return;
}
try {
setIsSubmitting(true);
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
// Clear thinking message when stopping
clearThinkingMessage();
// Call stop API
await stopChatbotApi(request, workflowId);
setIsRunning(false);
setError(null);
} catch (err: any) {
console.error('Error stopping chatbot:', err);
setError(err.message || 'Failed to stop chatbot');
} finally {
setIsSubmitting(false);
}
}, [workflowId, isRunning, request, clearThinkingMessage]);
// Handle form submit
const handleSubmit = useCallback(async () => {
// If running, stop instead of starting
if (isRunning && workflowId) {
await stopChatbot();
return;
}
const trimmedInput = inputValue.trim();
if (!trimmedInput || isSubmitting) {
return;
}
try {
setIsSubmitting(true);
setError(null);
setIsRunning(true);
// Abort any existing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
}
// Create new abort controller for this stream
const abortController = new AbortController();
streamAbortControllerRef.current = abortController;
// Use ref to get current file IDs (avoids closure issues)
const fileIdsToSend = pendingFileIdsRef.current.length > 0
? pendingFileIdsRef.current
: pendingFileIds; // Fallback to state if ref is empty
// Log for debugging
console.log('[handleSubmit] pendingFileIds from state:', pendingFileIds);
console.log('[handleSubmit] pendingFileIds from ref:', pendingFileIdsRef.current);
console.log('[handleSubmit] fileIdsToSend:', fileIdsToSend);
const requestBody: StartChatbotRequest = {
prompt: trimmedInput,
userLanguage: 'en',
...(workflowId && { workflowId })
};
// Always include listFileId if there are any files
if (fileIdsToSend.length > 0) {
requestBody.listFileId = fileIdsToSend;
console.log('[handleSubmit] Added listFileId to requestBody:', fileIdsToSend);
} else {
console.warn('[handleSubmit] No file IDs to send! Check if files were uploaded correctly.');
}
console.log('[handleSubmit] Final requestBody:', JSON.stringify(requestBody, null, 2));
// Track if workflow was created in this request
let workflowCreated = false;
// Clear thinking message when starting a new request
clearThinkingMessage();
processedLogsRef.current.clear(); // Clear processed logs for new request
// Track if this is the first event (to reset isSubmitting)
let firstEventReceived = false;
// Start SSE stream
await startChatbotStreamApi(
requestBody,
(item: ChatDataItem) => {
// Check if stream was aborted
if (abortController.signal.aborted) {
return;
}
// Reset isSubmitting after first event to enable stop button
if (!firstEventReceived) {
firstEventReceived = true;
setIsSubmitting(false);
}
// Process the chat data item
processChatDataItem(item);
// Extract workflow ID from message if available
if (item.type === 'message' && item.item) {
const messageData = item.item as any;
if (messageData.workflowId) {
if (!workflowId) {
// New workflow created - select it automatically
setWorkflowId(messageData.workflowId);
setSelectedThreadId(messageData.workflowId);
workflowCreated = true;
} else {
// Existing workflow - ensure it's selected
setSelectedThreadId(messageData.workflowId);
}
}
}
},
(err: Error) => {
// Only handle error if stream wasn't aborted
if (!abortController.signal.aborted) {
console.error('SSE stream error:', err);
setError(err.message || 'Stream error occurred');
setIsRunning(false);
// Reset isSubmitting if stream fails before first event
if (!firstEventReceived) {
setIsSubmitting(false);
}
// Clear thinking messages on error
clearThinkingMessage();
} else {
// Stream was aborted (stopped) - clear thinking messages
clearThinkingMessage();
}
},
() => {
// Stream completed
if (!abortController.signal.aborted) {
setIsRunning(false);
setInputValue(''); // Clear input on completion
// Clear pending file IDs after successful submission (files are now part of conversation)
setPendingFileIds([]);
pendingFileIdsRef.current = []; // Clear ref too
setUploadedFiles([]);
// Clear thinking message on completion (final message should have cleared it, but ensure cleanup)
clearThinkingMessage();
// Refresh threads list after message completion (silently, without loading state)
setTimeout(() => {
loadThreadsSilently();
}, 100);
} else {
// Stream was aborted (stopped) - clear thinking messages
clearThinkingMessage();
}
}
);
// Clear input after starting (optimistic)
if (!workflowId) {
setInputValue('');
}
} catch (err: any) {
console.error('Error starting chatbot:', err);
setError(err.message || 'Failed to start chatbot');
setIsRunning(false);
// Clear thinking messages on error
clearThinkingMessage();
} finally {
setIsSubmitting(false);
streamAbortControllerRef.current = null;
}
}, [inputValue, workflowId, isRunning, isSubmitting, stopChatbot, processChatDataItem, clearThinkingMessage, loadThreads, pendingFileIds]);
// Delete a chatbot workflow
const handleDeleteThread = useCallback(async (workflowIdToDelete: string): Promise<boolean> => {
try {
// Add to deleting set
setDeletingThreads(prev => new Set(prev).add(workflowIdToDelete));
// If deleting the selected thread, clear selection and messages
if (selectedThreadId === workflowIdToDelete) {
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
}
// Call delete API
await deleteChatbotWorkflowApi(request, workflowIdToDelete);
// Remove from threads list optimistically
setThreads(prev => prev.filter(t => t.id !== workflowIdToDelete));
return true;
} catch (err: any) {
console.error('Error deleting thread:', err);
throw err;
} finally {
// Remove from deleting set
setDeletingThreads(prev => {
const next = new Set(prev);
next.delete(workflowIdToDelete);
return next;
});
}
}, [request, selectedThreadId]);
// Optimistically remove thread from list
const removeThreadOptimistically = useCallback((workflowId: string) => {
setThreads(prev => prev.filter(t => t.id !== workflowId));
// If deleting the selected thread, clear selection
if (selectedThreadId === workflowId) {
setSelectedThreadId(null);
setMessages([]);
setWorkflowId(null);
}
}, [selectedThreadId]);
// Start a new chat (clears selection and messages)
const startNewChat = useCallback(() => {
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
// Clear messages and selection
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
setError(null);
setInputValue('');
setPendingFileIds([]);
pendingFileIdsRef.current = [];
setUploadedFiles([]);
thinkingLogsRef.current = [];
thinkingMessageIdRef.current = null;
processedLogsRef.current.clear();
clearProcessedMessages();
}, [clearProcessedMessages]);
// Reset chatbot state
const resetChatbot = useCallback(() => {
// Abort any ongoing stream
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
setMessages([]);
setWorkflowId(null);
setSelectedThreadId(null);
setIsRunning(false);
setIsSubmitting(false);
setError(null);
setInputValue('');
setPendingFileIds([]);
pendingFileIdsRef.current = [];
setUploadedFiles([]);
thinkingLogsRef.current = [];
thinkingMessageIdRef.current = null;
processedLogsRef.current.clear();
clearProcessedMessages();
}, [clearProcessedMessages]);
// Cleanup on unmount
const cleanup = useCallback(() => {
if (streamAbortControllerRef.current) {
streamAbortControllerRef.current.abort();
streamAbortControllerRef.current = null;
}
logQueueRef.current = [];
isProcessingLogsRef.current = false;
processedLogsRef.current.clear();
}, []);
// Memoized display messages
const displayMessages = useMemo(() => {
return messages.sort(sortMessages);
}, [messages]);
return {
// GenericDataHook interface
data: [],
loading: false,
error,
// Input form interface
inputValue,
onInputChange,
handleSubmit,
isSubmitting,
// Workflow state
workflowId: workflowId || undefined,
isRunning,
// Messages
messages: displayMessages,
// Chat history state
threads,
selectedThreadId,
threadsLoading,
threadsError,
// Chat history methods
selectThread,
loadThreads,
// Delete methods
handleDelete: handleDeleteThread,
removeOptimistically: removeThreadOptimistically,
deletingItems: deletingThreads,
refetch: loadThreads,
// Additional methods
stopChatbot,
resetChatbot,
startNewChat,
cleanup,
// File upload interface
handleFileUpload,
handleUpload: handleFileUpload, // Alias for compatibility with DragDropOverlay
handleFileRemove,
pendingFileIds,
uploadedFiles,
uploadingFile,
uploadError
};
}
export function createChatbotHook() {
return () => useChatbot();
}